Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions runners/google-cloud-dataflow-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ def commonLegacyExcludeCategories = [
'org.apache.beam.sdk.testing.UsesGaugeMetrics',
'org.apache.beam.sdk.testing.UsesMultimapState',
'org.apache.beam.sdk.testing.UsesTestStream',
'org.apache.beam.sdk.testing.UsesParDoLifecycle', // doesn't support remote runner
'org.apache.beam.sdk.testing.UsesMetricsPusher',
'org.apache.beam.sdk.testing.UsesBundleFinalizer',
'org.apache.beam.sdk.testing.UsesBoundedTrieMetrics', // Dataflow QM as of now does not support returning back BoundedTrie in metric result.
Expand Down Expand Up @@ -520,8 +519,7 @@ task validatesRunnerV2 {
excludedTests: [
'org.apache.beam.sdk.transforms.ReshuffleTest.testReshuffleWithTimestampsStreaming',

// TODO(https://github.com/apache/beam/issues/18592): respect ParDo lifecycle.
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testFnCallSequenceStateful',
// These tests use static state and don't work with remote execution.
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
Expand Down Expand Up @@ -563,7 +561,7 @@ task validatesRunnerV2Streaming {
'org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState',
'org.apache.beam.sdk.transforms.GroupByKeyTest.testCombiningAccumulatingProcessingTime',

// TODO(https://github.com/apache/beam/issues/18592): respect ParDo lifecycle.
// These tests use static state and don't work with remote execution.
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundle',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInFinishBundleStateful',
'org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,11 +105,32 @@ public DataflowMapTaskExecutor create(
Networks.replaceDirectedNetworkNodes(
network, createOutputReceiversTransform(stageName, counterSet));

// Swap out all the ParallelInstruction nodes with Operation nodes
Networks.replaceDirectedNetworkNodes(
network,
createOperationTransformForParallelInstructionNodes(
stageName, network, options, readerFactory, sinkFactory, executionContext));
// Swap out all the ParallelInstruction nodes with Operation nodes. While updating the network,
// we keep track of
// the created Operations so that if an exception is encountered we can properly abort started
// operations.
ArrayList<Operation> createdOperations = new ArrayList<>();
try {
Networks.replaceDirectedNetworkNodes(
network,
createOperationTransformForParallelInstructionNodes(
stageName,
network,
options,
readerFactory,
sinkFactory,
executionContext,
createdOperations));
} catch (RuntimeException exn) {
for (Operation o : createdOperations) {
try {
o.abort();
} catch (Exception exn2) {
exn.addSuppressed(exn2);
}
}
throw exn;
}

// Collect all the operations within the network and attach all the operations as receivers
// to preceding output receivers.
Expand Down Expand Up @@ -144,7 +165,8 @@ Function<Node, Node> createOperationTransformForParallelInstructionNodes(
final PipelineOptions options,
final ReaderFactory readerFactory,
final SinkFactory sinkFactory,
final DataflowExecutionContext<?> executionContext) {
final DataflowExecutionContext<?> executionContext,
final List<Operation> createdOperations) {

return new TypeSafeNodeFunction<ParallelInstructionNode>(ParallelInstructionNode.class) {
@Override
Expand All @@ -156,27 +178,31 @@ public Node typedApply(ParallelInstructionNode node) {
instruction.getOriginalName(),
instruction.getSystemName(),
instruction.getName());
OperationNode result;
try {
DataflowOperationContext context = executionContext.createOperationContext(nameContext);
if (instruction.getRead() != null) {
return createReadOperation(
network, node, options, readerFactory, executionContext, context);
result =
createReadOperation(
network, node, options, readerFactory, executionContext, context);
} else if (instruction.getWrite() != null) {
return createWriteOperation(node, options, sinkFactory, executionContext, context);
result = createWriteOperation(node, options, sinkFactory, executionContext, context);
} else if (instruction.getParDo() != null) {
return createParDoOperation(network, node, options, executionContext, context);
result = createParDoOperation(network, node, options, executionContext, context);
} else if (instruction.getPartialGroupByKey() != null) {
return createPartialGroupByKeyOperation(
network, node, options, executionContext, context);
result =
createPartialGroupByKeyOperation(network, node, options, executionContext, context);
} else if (instruction.getFlatten() != null) {
return createFlattenOperation(network, node, context);
result = createFlattenOperation(network, node, context);
} else {
throw new IllegalArgumentException(
String.format("Unexpected instruction: %s", instruction));
}
} catch (Exception e) {
throw new RuntimeException(e);
}
createdOperations.add(result.getOperation());
return result;
}
};
}
Expand Down Expand Up @@ -328,7 +354,6 @@ public Node typedApply(InstructionOutputNode input) {
Coder<?> coder =
CloudObjects.coderFromCloudObject(CloudObject.fromSpec(cloudOutput.getCodec()));

@SuppressWarnings("unchecked")
ElementCounter outputCounter =
new DataflowOutputCounter(
cloudOutput.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
package org.apache.beam.runners.dataflow.worker.util.common.worker;

import java.io.Closeable;
import java.util.ArrayList;
import java.util.List;
import java.util.ListIterator;
import org.apache.beam.runners.core.metrics.ExecutionStateTracker;
import org.apache.beam.runners.dataflow.worker.counters.CounterSet;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
Expand All @@ -36,7 +36,9 @@ public class MapTaskExecutor implements WorkExecutor {
private static final Logger LOG = LoggerFactory.getLogger(MapTaskExecutor.class);

/** The operations in the map task, in execution order. */
public final List<Operation> operations;
public final ArrayList<Operation> operations;

private boolean closed = false;

private final ExecutionStateTracker executionStateTracker;

Expand All @@ -54,7 +56,7 @@ public MapTaskExecutor(
CounterSet counters,
ExecutionStateTracker executionStateTracker) {
this.counters = counters;
this.operations = operations;
this.operations = new ArrayList<>(operations);
this.executionStateTracker = executionStateTracker;
}

Expand All @@ -63,6 +65,7 @@ public CounterSet getOutputCounters() {
return counters;
}

/** May be reused if execute() returns without an exception being thrown. */
@Override
public void execute() throws Exception {
LOG.debug("Executing map task");
Expand All @@ -74,13 +77,11 @@ public void execute() throws Exception {
// Starting a root operation such as a ReadOperation does the work
// of processing the input dataset.
LOG.debug("Starting operations");
ListIterator<Operation> iterator = operations.listIterator(operations.size());
while (iterator.hasPrevious()) {
for (int i = operations.size() - 1; i >= 0; --i) {
if (Thread.currentThread().isInterrupted()) {
throw new InterruptedException("Worker aborted");
}
Operation op = iterator.previous();
op.start();
operations.get(i).start();
}

// Finish operations, in forward-execution-order, so that a
Expand All @@ -94,16 +95,13 @@ public void execute() throws Exception {
op.finish();
}
} catch (Exception | Error exn) {
LOG.debug("Aborting operations", exn);
for (Operation op : operations) {
try {
op.abort();
} catch (Exception | Error exn2) {
exn.addSuppressed(exn2);
if (exn2 instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
}
try {
closeInternal();
} catch (Exception closeExn) {
exn.addSuppressed(closeExn);
}
if (exn instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
throw exn;
}
Expand Down Expand Up @@ -164,6 +162,45 @@ public void abort() {
}
}

private void closeInternal() throws Exception {
Preconditions.checkState(!closed);
LOG.debug("Aborting operations");
@Nullable Exception exn = null;
for (Operation op : operations) {
try {
op.abort();
} catch (Exception | Error exn2) {
if (exn2 instanceof InterruptedException) {
Thread.currentThread().interrupt();
}
if (exn == null) {
if (exn2 instanceof Exception) {
exn = (Exception) exn2;
} else {
exn = new RuntimeException(exn2);
}
} else {
exn.addSuppressed(exn2);
}
}
}
closed = true;
if (exn != null) {
throw exn;
}
}

@Override
public void close() {
if (!closed) {
try {
closeInternal();
} catch (Exception e) {
LOG.error("Exception while closing MapTaskExecutor, ignoring", e);
}
}
}

@Override
public List<Integer> reportProducedEmptyOutput() {
List<Integer> emptyOutputSinkIndexes = Lists.newArrayList();
Expand Down
Loading
Loading