Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.streams.processor.TaskId;

import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Map.Entry;
Expand All @@ -47,12 +48,17 @@ public class TaskExecutor {

private final Logger log;

private final boolean hasNamedTopologies;
private final ProcessingMode processingMode;
private final Tasks tasks;

public TaskExecutor(final Tasks tasks, final ProcessingMode processingMode, final LogContext logContext) {
public TaskExecutor(final Tasks tasks,
final ProcessingMode processingMode,
final boolean hasNamedTopologies,
final LogContext logContext) {
this.tasks = tasks;
this.processingMode = processingMode;
this.hasNamedTopologies = hasNamedTopologies;
this.log = logContext.logger(getClass());
}

Expand All @@ -62,9 +68,16 @@ public TaskExecutor(final Tasks tasks, final ProcessingMode processingMode, fina
*/
int process(final int maxNumRecords, final Time time) {
int totalProcessed = 0;

for (final Task task : tasks.activeTasks()) {
totalProcessed += processTask(task, maxNumRecords, time);
Task lastProcessed = null;
try {
for (final Task task : tasks.activeTasks()) {
lastProcessed = task;
totalProcessed += processTask(task, maxNumRecords, time);
}
} catch (final Exception e) {
tasks.removeTaskFromCuccessfullyProcessedBeforeClosing(lastProcessed);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If an exception throws here, it means we should have not reached line 93 below and hence this task should not have been added to the set, and hence we do not need to ever call removeTaskFromCuccessfullyProcessedBeforeClosing right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only if we hit the error in the first set of processing. We could have already processed it once this iteration and are coming back todo it again. I ran into this issue while testing and I m pretty sure that is why it is coming up again.

commitSuccessfullyProcessedTasks();
throw e;
}

return totalProcessed;
Expand All @@ -80,6 +93,10 @@ private long processTask(final Task task, final int maxNumRecords, final Time ti
task.clearTaskTimeout();
processed++;
}
// TODO: enable regardless of whether using named topologies
if (hasNamedTopologies && processingMode != EXACTLY_ONCE_V2) {
tasks.addToSuccessfullyProcessed(task);
}
} catch (final TimeoutException timeoutException) {
task.maybeInitTaskTimeoutOrThrow(now, timeoutException);
log.debug(
Expand Down Expand Up @@ -139,8 +156,6 @@ int commitTasksAndMaybeUpdateCommittableOffsets(final Collection<Task> tasksToCo
return committed;
}



/**
* Caution: do not invoke this directly if it's possible a rebalance is occurring, as the commit will fail. If
* this is a possibility, prefer the {@link #commitTasksAndMaybeUpdateCommittableOffsets} instead.
Expand Down Expand Up @@ -234,6 +249,16 @@ private void updateTaskCommitMetadata(final Map<TopicPartition, OffsetAndMetadat
}
}

private void commitSuccessfullyProcessedTasks() {
if (!tasks.successfullyProcessed().isEmpty()) {
log.info("Streams encountered an error when processing tasks." +
" Will commit all previously successfully processed tasks {}",
tasks.successfullyProcessed().toString());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For future reference, you can skip the toString() -- the logger should make the conversion implicitly

commitTasksAndMaybeUpdateCommittableOffsets(tasks.successfullyProcessed(), new HashMap<>());
}
tasks.clearSuccessfullyProcessed();
}

/**
* @throws TaskMigratedException if the task producer got fenced (EOS only)
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ public class TaskManager {
this.log = logContext.logger(getClass());

this.tasks = new Tasks(logContext, topologyMetadata, streamsMetrics, activeTaskCreator, standbyTaskCreator);
this.taskExecutor = new TaskExecutor(tasks, processingMode, logContext);
this.taskExecutor = new TaskExecutor(tasks, processingMode, topologyMetadata.hasNamedTopologies(), logContext);
}

void setMainConsumer(final Consumer<byte[], byte[]> mainConsumer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class Tasks {
// TODO: change type to `StandbyTask`
private final Map<TaskId, Task> readOnlyStandbyTasksPerId = Collections.unmodifiableMap(standbyTasksPerId);
private final Set<TaskId> readOnlyStandbyTaskIds = Collections.unmodifiableSet(standbyTasksPerId.keySet());
private final Collection<Task> successfullyProcessed = new HashSet<>();

private final ActiveTaskCreator activeTaskCreator;
private final StandbyTaskCreator standbyTaskCreator;
Expand Down Expand Up @@ -319,6 +320,22 @@ Consumer<byte[], byte[]> mainConsumer() {
return mainConsumer;
}

Collection<Task> successfullyProcessed() {
return successfullyProcessed;
}

void addToSuccessfullyProcessed(final Task task) {
successfullyProcessed.add(task);
}

void removeTaskFromCuccessfullyProcessedBeforeClosing(final Task task) {
successfullyProcessed.remove(task);
}

void clearSuccessfullyProcessed() {
successfullyProcessed.clear();
}

// for testing only
void addTask(final Task task) {
if (task.isActive()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.processor.internals.namedtopology.KafkaStreamsNamedTopologyWrapper;
import org.apache.kafka.streams.processor.internals.namedtopology.NamedTopologyBuilder;
import org.apache.kafka.test.IntegrationTest;
import org.apache.kafka.test.StreamsTestUtils;
import org.apache.kafka.test.TestUtils;
Expand All @@ -45,11 +47,14 @@
import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.common.utils.Utils.mkObjectProperties;
import static org.apache.kafka.streams.integration.utils.IntegrationTestUtils.safeUniqueTestName;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

@Category(IntegrationTest.class)
public class EmitOnChangeIntegrationTest {
Expand All @@ -70,16 +75,20 @@ public static void closeCluster() {
public TestName testName = new TestName();

private static String inputTopic;
private static String inputTopic2;
private static String outputTopic;
private static String outputTopic2;
private static String appId = "";

@Before
public void setup() {
final String testId = safeUniqueTestName(getClass(), testName);
appId = "appId_" + testId;
inputTopic = "input" + testId;
inputTopic2 = "input2" + testId;
outputTopic = "output" + testId;
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic);
outputTopic2 = "output2" + testId;
IntegrationTestUtils.cleanStateBeforeTest(CLUSTER, inputTopic, outputTopic, inputTopic2, outputTopic2);
}

@Test
Expand Down Expand Up @@ -110,6 +119,7 @@ public void shouldEmitSameRecordAfterFailover() throws Exception {
}
})
.to(outputTopic);
builder.stream(inputTopic2).to(outputTopic2);

try (final KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), properties)) {
kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);
Expand All @@ -128,6 +138,19 @@ public void shouldEmitSameRecordAfterFailover() throws Exception {
new Properties()),
0L);

IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
inputTopic2,
Arrays.asList(
new KeyValue<>(1, "A"),
new KeyValue<>(1, "B")
),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
0L);

IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
Expand All @@ -140,6 +163,91 @@ public void shouldEmitSameRecordAfterFailover() throws Exception {
new KeyValue<>(1, "B")
)
);
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
IntegerDeserializer.class,
StringDeserializer.class
),
outputTopic2,
Arrays.asList(
new KeyValue<>(1, "A"),
new KeyValue<>(1, "B")
)
);
}
}

@Test
public void shouldEmitRecordsAfterFailures() throws Exception {
final Properties properties = mkObjectProperties(
mkMap(
mkEntry(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()),
mkEntry(StreamsConfig.APPLICATION_ID_CONFIG, appId),
mkEntry(StreamsConfig.STATE_DIR_CONFIG, TestUtils.tempDirectory().getPath()),
mkEntry(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 1),
mkEntry(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, 0),
mkEntry(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 300000L),
mkEntry(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.IntegerSerde.class),
mkEntry(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.StringSerde.class),
mkEntry(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000)
)
);

try (final KafkaStreamsNamedTopologyWrapper kafkaStreams = new KafkaStreamsNamedTopologyWrapper(properties)) {
kafkaStreams.setUncaughtExceptionHandler(exception -> StreamThreadExceptionResponse.REPLACE_THREAD);

final NamedTopologyBuilder builder = kafkaStreams.newNamedTopologyBuilder("topology_A");
final AtomicInteger noOutputExpected = new AtomicInteger(0);
final AtomicInteger twoOutputExpected = new AtomicInteger(0);
builder.stream(inputTopic2).peek((k, v) -> twoOutputExpected.incrementAndGet()).to(outputTopic2);
builder.stream(inputTopic)
.peek((k, v) -> {
throw new RuntimeException("Kaboom");
})
.peek((k, v) -> noOutputExpected.incrementAndGet())
.to(outputTopic);

kafkaStreams.addNamedTopology(builder.build());

StreamsTestUtils.startKafkaStreamsAndWaitForRunningState(kafkaStreams);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
inputTopic,
Arrays.asList(
new KeyValue<>(1, "A")
),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
0L);
IntegrationTestUtils.produceKeyValuesSynchronouslyWithTimestamp(
inputTopic2,
Arrays.asList(
new KeyValue<>(1, "A"),
new KeyValue<>(1, "B")
),
TestUtils.producerConfig(
CLUSTER.bootstrapServers(),
IntegerSerializer.class,
StringSerializer.class,
new Properties()),
0L);
IntegrationTestUtils.waitUntilFinalKeyValueRecordsReceived(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also check that outputTopic never sees the record 1 -> A since that task kept throwing exception?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will count using peek after the exception

TestUtils.consumerConfig(
CLUSTER.bootstrapServers(),
IntegerDeserializer.class,
StringDeserializer.class
),
outputTopic2,
Arrays.asList(
new KeyValue<>(1, "A"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also I think by just checking that 1->A and 1->B are there we do not guarantee there's no duplicates due to re-processing right? I think we should check that the offset on the input topic can be committed and also there's no duplicates in the output.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah good idea. I will check to make sure it doesn't get processed more than once.

new KeyValue<>(1, "B")
)
);
assertThat(noOutputExpected.get(), equalTo(0));
assertThat(twoOutputExpected.get(), equalTo(2));
}
}
}