Skip to content

Commit

Permalink
Disallow UNION
Browse files Browse the repository at this point in the history
  • Loading branch information
okumin committed Aug 4, 2024
1 parent dbfa680 commit 9e09665
Showing 1 changed file with 5 additions and 33 deletions.
38 changes: 5 additions & 33 deletions tez-examples/src/main/java/org/apache/tez/examples/Demuxer.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@
import org.apache.tez.common.Preconditions;
import org.apache.tez.dag.api.DAG;
import org.apache.tez.dag.api.Edge;
import org.apache.tez.dag.api.GroupInputEdge;
import org.apache.tez.dag.api.InputDescriptor;
import org.apache.tez.dag.api.ProcessorDescriptor;
import org.apache.tez.dag.api.TezConfiguration;
import org.apache.tez.dag.api.Vertex;
Expand All @@ -51,7 +49,6 @@
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration;
import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats;
import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig;
import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput;
import org.apache.tez.runtime.library.partitioner.HashPartitioner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -101,6 +98,10 @@ protected int runJob(String[] args, TezConfiguration tezConf, TezClient tezClien
.stream(inputDirs.split(","))
.map(Path::new)
.collect(Collectors.toList());
if (inputPaths.size() > 1) {
System.err.println("Input paths must contain exactly one path until TEZ-4508 is resolved");
return 3;
}
Path outputPath = new Path(outputDir);

FileSystem fs = outputPath.getFileSystem(tezConf);
Expand All @@ -109,9 +110,7 @@ protected int runJob(String[] args, TezConfiguration tezConf, TezClient tezClien
System.err.println("Output directory: " + outputDir + " already exists");
return 3;
}
DAG dag = inputPaths.size() == 1
? createDag(tezConf, inputPaths.get(0), outputPath, numPartitions)
: createDagWithUnion(tezConf, inputPaths, outputPath, numPartitions);
DAG dag = createDag(tezConf, inputPaths.get(0), outputPath, numPartitions);
LOG.info("Running Demuxer");
return runDag(dag, isCountersLog(), LOG);
}
Expand All @@ -134,33 +133,6 @@ private DAG createDag(TezConfiguration tezConf, Path inputPath, Path outputPath,
.addEdge(Edge.create(inputVertex, demuxVertex, edgeConf.createDefaultEdgeProperty()));
}

private DAG createDagWithUnion(TezConfiguration tezConf, List<Path> inputPaths, Path outputPath,
int numPartitions) {
Vertex[] inputVertices = new Vertex[inputPaths.size()];
for (int i = 0; i < inputPaths.size(); i++) {
inputVertices[i] = createInputVertex(tezConf, "input-" + i, inputPaths.get(i));
}

Vertex demuxVertex = createDemuxVertex(tezConf, outputPath, numPartitions);

OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig
.newBuilder(Text.class.getName(), NullWritable.class.getName(),
HashPartitioner.class.getName())
.setFromConfiguration(tezConf)
.build();

DAG dag = DAG.create("Demuxer");
Arrays.stream(inputVertices).forEach(dag::addVertex);
return dag
.addVertex(demuxVertex)
.addEdge(
GroupInputEdge.create(
dag.createVertexGroup("union", inputVertices),
demuxVertex,
edgeConf.createDefaultEdgeProperty(),
InputDescriptor.create(ConcatenatedMergedKeyValuesInput.class.getName())));
}

private Vertex createInputVertex(TezConfiguration tezConf, String vertexName, Path path) {
// This vertex represents an input vertex for the demuxer. It reads text data using the
// TextInputFormat. ForwardingProcessor simply forwards the data downstream as is.
Expand Down

0 comments on commit 9e09665

Please sign in to comment.