From dbfa6809607675bdad19c77a61f4f45aa023dae5 Mon Sep 17 00:00:00 2001 From: okumin Date: Mon, 11 Dec 2023 22:13:28 +0900 Subject: [PATCH 1/2] TEZ-3268: Provide a demuxer sample app that uses fair routing --- .../java/org/apache/tez/examples/Demuxer.java | 231 ++++++++++++++++++ .../apache/tez/examples/DemuxerDataGen.java | 170 +++++++++++++ .../apache/tez/examples/ExampleDriver.java | 4 + 3 files changed, 405 insertions(+) create mode 100644 tez-examples/src/main/java/org/apache/tez/examples/Demuxer.java create mode 100644 tez-examples/src/main/java/org/apache/tez/examples/DemuxerDataGen.java diff --git a/tez-examples/src/main/java/org/apache/tez/examples/Demuxer.java b/tez-examples/src/main/java/org/apache/tez/examples/Demuxer.java new file mode 100644 index 0000000000..81704c3392 --- /dev/null +++ b/tez-examples/src/main/java/org/apache/tez/examples/Demuxer.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.examples; + +import java.util.Arrays; +import java.util.List; +import java.util.stream.Collectors; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.client.TezClient; +import org.apache.tez.common.Preconditions; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.Edge; +import org.apache.tez.dag.api.GroupInputEdge; +import org.apache.tez.dag.api.InputDescriptor; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.dag.library.vertexmanager.FairShuffleVertexManager; +import org.apache.tez.dag.library.vertexmanager.FairShuffleVertexManager.FairRoutingType; +import org.apache.tez.examples.HashJoinExample.ForwardingProcessor; +import org.apache.tez.mapreduce.input.MRInput; +import org.apache.tez.mapreduce.output.MultiMROutput; +import org.apache.tez.mapreduce.processor.SimpleMRProcessor; +import org.apache.tez.runtime.api.LogicalInput; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.library.api.KeyValueWriterWithBasePath; +import org.apache.tez.runtime.library.api.KeyValuesReader; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; +import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats; +import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; +import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput; +import org.apache.tez.runtime.library.partitioner.HashPartitioner; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Simple example of a demuxer tolerant of data skew problems. The example scans the source files, + * partitions records by values, and write them in /path/to/{value}-r-{task id}. + */ +public class Demuxer extends TezExampleBase { + private static final Logger LOG = LoggerFactory.getLogger(Demuxer.class); + + private static final String DEMUXER_OUTPUT = "demuxerOutput"; + + public static void main(String[] args) throws Exception { + Demuxer job = new Demuxer(); + int status = ToolRunner.run(new Configuration(), job, args); + System.exit(status); + } + + @Override + protected void printUsage() { + System.err.println("Usage: " + + "demuxer [isPrecise(default false)]"); + } + + @Override + protected int validateArgs(String[] otherArgs) { + if (otherArgs.length < 3 || otherArgs.length > 4) { + return 2; + } + return 0; + } + + @Override + protected int runJob(String[] args, TezConfiguration tezConf, TezClient tezClient) + throws Exception { + String inputDirs = args[0]; + String outputDir = args[1]; + int numPartitions = Integer.parseInt(args[2]); + if (args.length == 4 && Boolean.parseBoolean(args[3])) { + tezConf.set( + TezRuntimeConfiguration.TEZ_RUNTIME_REPORT_PARTITION_STATS, + ReportPartitionStats.PRECISE.getType()); + } + + List inputPaths = Arrays + .stream(inputDirs.split(",")) + .map(Path::new) + .collect(Collectors.toList()); + Path outputPath = new Path(outputDir); + + FileSystem fs = outputPath.getFileSystem(tezConf); + outputPath = fs.makeQualified(outputPath); + if (fs.exists(outputPath)) { + System.err.println("Output directory: " + outputDir + " already exists"); + return 3; + } + DAG dag = inputPaths.size() == 1 + ? createDag(tezConf, inputPaths.get(0), outputPath, numPartitions) + : createDagWithUnion(tezConf, inputPaths, outputPath, numPartitions); + LOG.info("Running Demuxer"); + return runDag(dag, isCountersLog(), LOG); + } + + private DAG createDag(TezConfiguration tezConf, Path inputPath, Path outputPath, + int numPartitions) { + Vertex inputVertex = createInputVertex(tezConf, "input", inputPath); + Vertex demuxVertex = createDemuxVertex(tezConf, outputPath, numPartitions); + OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig + .newBuilder( + Text.class.getName(), + NullWritable.class.getName(), + HashPartitioner.class.getName()) + .setFromConfiguration(tezConf) + .build(); + return DAG + .create("Demuxer") + .addVertex(inputVertex) + .addVertex(demuxVertex) + .addEdge(Edge.create(inputVertex, demuxVertex, edgeConf.createDefaultEdgeProperty())); + } + + private DAG createDagWithUnion(TezConfiguration tezConf, List inputPaths, Path outputPath, + int numPartitions) { + Vertex[] inputVertices = new Vertex[inputPaths.size()]; + for (int i = 0; i < inputPaths.size(); i++) { + inputVertices[i] = createInputVertex(tezConf, "input-" + i, inputPaths.get(i)); + } + + Vertex demuxVertex = createDemuxVertex(tezConf, outputPath, numPartitions); + + OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig + .newBuilder(Text.class.getName(), NullWritable.class.getName(), + HashPartitioner.class.getName()) + .setFromConfiguration(tezConf) + .build(); + + DAG dag = DAG.create("Demuxer"); + Arrays.stream(inputVertices).forEach(dag::addVertex); + return dag + .addVertex(demuxVertex) + .addEdge( + GroupInputEdge.create( + dag.createVertexGroup("union", inputVertices), + demuxVertex, + edgeConf.createDefaultEdgeProperty(), + InputDescriptor.create(ConcatenatedMergedKeyValuesInput.class.getName()))); + } + + private Vertex createInputVertex(TezConfiguration tezConf, String vertexName, Path path) { + // This vertex represents an input vertex for the demuxer. It reads text data using the + // TextInputFormat. ForwardingProcessor simply forwards the data downstream as is. + return Vertex + .create(vertexName, ProcessorDescriptor.create(ForwardingProcessor.class.getName())) + .addDataSource( + "inputFile", + MRInput + .createConfigBuilder( + new Configuration(tezConf), + TextInputFormat.class, + path.toUri().toString()) + .groupSplits(!isDisableSplitGrouping()) + .generateSplitsInAM(!isGenerateSplitInClient()).build()); + } + + private Vertex createDemuxVertex(TezConfiguration tezConf, Path outputPath, int numPartitions) { + // This vertex demuxes records based on the keys. Multiple reduce tasks can process the same key + // as fair routing is configured. + return Vertex + .create( + "demuxer", + ProcessorDescriptor.create(DemuxProcessor.class.getName()), numPartitions) + .setVertexManagerPlugin( + FairShuffleVertexManager + .createConfigBuilder(tezConf) + .setAutoParallelism(FairRoutingType.FAIR_PARALLELISM) + // These params demonstrate perfect fair routing + .setSlowStartMinSrcCompletionFraction(1.0f) + .setSlowStartMaxSrcCompletionFraction(1.0f) + .build()) + .addDataSink( + DEMUXER_OUTPUT, + MultiMROutput + .createConfigBuilder( + new Configuration(tezConf), + TextOutputFormat.class, + outputPath.toUri().toString(), + false) + .build()); + } + + public static class DemuxProcessor extends SimpleMRProcessor { + public DemuxProcessor(ProcessorContext context) { + super(context); + } + + @Override + public void run() throws Exception { + Preconditions.checkArgument(!getInputs().isEmpty()); + Preconditions.checkArgument(getOutputs().size() == 1); + KeyValueWriterWithBasePath kvWriter = (KeyValueWriterWithBasePath) getOutputs() + .get(DEMUXER_OUTPUT) + .getWriter(); + for (LogicalInput input : getInputs().values()) { + KeyValuesReader kvReader = (KeyValuesReader) input.getReader(); + while (kvReader.next()) { + Text category = (Text) kvReader.getCurrentKey(); + String path = category.toString(); + for (Object value : kvReader.getCurrentValues()) { + assert value == NullWritable.get(); + kvWriter.write(category, NullWritable.get(), path); + } + } + } + } + } +} diff --git a/tez-examples/src/main/java/org/apache/tez/examples/DemuxerDataGen.java b/tez-examples/src/main/java/org/apache/tez/examples/DemuxerDataGen.java new file mode 100644 index 0000000000..82174b32ce --- /dev/null +++ b/tez-examples/src/main/java/org/apache/tez/examples/DemuxerDataGen.java @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.examples; + +import java.io.DataInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.hadoop.util.ToolRunner; +import org.apache.tez.client.TezClient; +import org.apache.tez.common.Preconditions; +import org.apache.tez.common.io.NonSyncByteArrayInputStream; +import org.apache.tez.common.io.NonSyncByteArrayOutputStream; +import org.apache.tez.common.io.NonSyncDataOutputStream; +import org.apache.tez.dag.api.DAG; +import org.apache.tez.dag.api.ProcessorDescriptor; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.UserPayload; +import org.apache.tez.dag.api.Vertex; +import org.apache.tez.mapreduce.output.MROutput; +import org.apache.tez.mapreduce.processor.SimpleMRProcessor; +import org.apache.tez.runtime.api.ProcessorContext; +import org.apache.tez.runtime.library.api.KeyValueWriter; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Generates records with data skew. + */ +public class DemuxerDataGen extends TezExampleBase { + private static final Logger LOG = LoggerFactory.getLogger(DemuxerDataGen.class); + + private static final String OUTPUT_NAME = "output"; + + public static void main(String[] args) throws Exception { + DemuxerDataGen dataGen = new DemuxerDataGen(); + int status = ToolRunner.run(new Configuration(), dataGen, args); + System.exit(status); + } + + @Override + protected void printUsage() { + System.err.println("Usage: demuxerdatagen "); + ToolRunner.printGenericCommandUsage(System.err); + } + + @Override + protected int runJob(String[] args, TezConfiguration tezConf, TezClient tezClient) + throws Exception { + LOG.info("Running DemuxerDataGen"); + + String outDir = args[0]; + int numCategories = Integer.parseInt(args[1]); + int numTasks = args.length == 2 ? 1 : Integer.parseInt(args[2]); + + Path outputPath = new Path(outDir); + FileSystem fs = outputPath.getFileSystem(tezConf); + if (fs.exists(outputPath)) { + System.err.println("Output directory: " + outDir + " already exists"); + return 2; + } + + DAG dag = createDag(tezConf, outputPath, numCategories, numTasks); + + return runDag(dag, isCountersLog(), LOG); + } + + + @Override + protected int validateArgs(String[] otherArgs) { + if (otherArgs.length < 2 || otherArgs.length > 3) { + return 2; + } + return 0; + } + + private DAG createDag(TezConfiguration tezConf, Path outputPath, int numCategories, int numTasks) + throws IOException { + DAG dag = DAG.create("DemuxerDataGen"); + + Vertex genDataVertex = Vertex + .create( + "datagen", + ProcessorDescriptor + .create(GenDataProcessor.class.getName()) + .setUserPayload( + UserPayload.create( + ByteBuffer.wrap( + GenDataProcessor.createConfiguration(numCategories, numTasks)))), + numTasks) + .addDataSink(OUTPUT_NAME, + MROutput.createConfigBuilder(new Configuration(tezConf), + TextOutputFormat.class, outputPath.toUri().toString()).build()); + + dag.addVertex(genDataVertex); + + return dag; + } + + public static class GenDataProcessor extends SimpleMRProcessor { + + private static final Logger LOG = LoggerFactory.getLogger(GenDataProcessor.class); + + private int numCategories; + private int numTasks; + + public GenDataProcessor(ProcessorContext context) { + super(context); + } + + public static byte[] createConfiguration(int numCategories, int numTasks) throws IOException { + NonSyncByteArrayOutputStream bos = new NonSyncByteArrayOutputStream(); + NonSyncDataOutputStream dos = new NonSyncDataOutputStream(bos); + dos.writeInt(numCategories); + dos.writeInt(numTasks); + dos.close(); + bos.close(); + return bos.toByteArray(); + } + + @Override + public void initialize() throws Exception { + byte[] payload = getContext().getUserPayload().deepCopyAsArray(); + NonSyncByteArrayInputStream bis = new NonSyncByteArrayInputStream(payload); + DataInputStream dis = new DataInputStream(bis); + numCategories = dis.readInt(); + numTasks = dis.readInt(); + LOG.info("Initialized with numCategories={} and numTasks={}", numCategories, numTasks); + dis.close(); + bis.close(); + } + + @Override + public void run() throws Exception { + Preconditions.checkState(getInputs().isEmpty()); + Preconditions.checkState(getOutputs().size() == 1); + + KeyValueWriter outputWriter = (KeyValueWriter) getOutputs().get(OUTPUT_NAME).getWriter(); + + for (int i = 0; i < numCategories; i++) { + Text name = new Text(String.format("category-%05d", i)); + long numValues = Math.max(((long) Math.pow(2, i)) / numTasks, 1); + for (int j = 0; j < numValues; j++) { + outputWriter.write(name, NullWritable.get()); + } + } + } + } +} diff --git a/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java b/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java index c143e21d77..2078308600 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/ExampleDriver.java @@ -48,6 +48,10 @@ public static void main(String[] argv){ "Validate data generated by joinexample and joindatagen"); pgd.addClass("cartesianproduct", CartesianProduct.class, "Cartesian product of two datasets"); + pgd.addClass("demuxer", Demuxer.class, + "Demuxer with FairShuffleVertexManager"); + pgd.addClass("demuxerdatagen", DemuxerDataGen.class, + "Generate data to run demuxer"); exitCode = pgd.run(argv); } catch(Throwable e){ e.printStackTrace(); From 9e0966541383e305d4505f23178aaebfc90181b5 Mon Sep 17 00:00:00 2001 From: okumin Date: Sun, 4 Aug 2024 21:35:42 +0900 Subject: [PATCH 2/2] Disallow UNION --- .../java/org/apache/tez/examples/Demuxer.java | 38 +++---------------- 1 file changed, 5 insertions(+), 33 deletions(-) diff --git a/tez-examples/src/main/java/org/apache/tez/examples/Demuxer.java b/tez-examples/src/main/java/org/apache/tez/examples/Demuxer.java index 81704c3392..a0e6d7ccf5 100644 --- a/tez-examples/src/main/java/org/apache/tez/examples/Demuxer.java +++ b/tez-examples/src/main/java/org/apache/tez/examples/Demuxer.java @@ -33,8 +33,6 @@ import org.apache.tez.common.Preconditions; import org.apache.tez.dag.api.DAG; import org.apache.tez.dag.api.Edge; -import org.apache.tez.dag.api.GroupInputEdge; -import org.apache.tez.dag.api.InputDescriptor; import org.apache.tez.dag.api.ProcessorDescriptor; import org.apache.tez.dag.api.TezConfiguration; import org.apache.tez.dag.api.Vertex; @@ -51,7 +49,6 @@ import org.apache.tez.runtime.library.api.TezRuntimeConfiguration; import org.apache.tez.runtime.library.api.TezRuntimeConfiguration.ReportPartitionStats; import org.apache.tez.runtime.library.conf.OrderedPartitionedKVEdgeConfig; -import org.apache.tez.runtime.library.input.ConcatenatedMergedKeyValuesInput; import org.apache.tez.runtime.library.partitioner.HashPartitioner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,6 +98,10 @@ protected int runJob(String[] args, TezConfiguration tezConf, TezClient tezClien .stream(inputDirs.split(",")) .map(Path::new) .collect(Collectors.toList()); + if (inputPaths.size() > 1) { + System.err.println("Input paths must contain exactly one path until TEZ-4508 is resolved"); + return 3; + } Path outputPath = new Path(outputDir); FileSystem fs = outputPath.getFileSystem(tezConf); @@ -109,9 +110,7 @@ protected int runJob(String[] args, TezConfiguration tezConf, TezClient tezClien System.err.println("Output directory: " + outputDir + " already exists"); return 3; } - DAG dag = inputPaths.size() == 1 - ? createDag(tezConf, inputPaths.get(0), outputPath, numPartitions) - : createDagWithUnion(tezConf, inputPaths, outputPath, numPartitions); + DAG dag = createDag(tezConf, inputPaths.get(0), outputPath, numPartitions); LOG.info("Running Demuxer"); return runDag(dag, isCountersLog(), LOG); } @@ -134,33 +133,6 @@ private DAG createDag(TezConfiguration tezConf, Path inputPath, Path outputPath, .addEdge(Edge.create(inputVertex, demuxVertex, edgeConf.createDefaultEdgeProperty())); } - private DAG createDagWithUnion(TezConfiguration tezConf, List inputPaths, Path outputPath, - int numPartitions) { - Vertex[] inputVertices = new Vertex[inputPaths.size()]; - for (int i = 0; i < inputPaths.size(); i++) { - inputVertices[i] = createInputVertex(tezConf, "input-" + i, inputPaths.get(i)); - } - - Vertex demuxVertex = createDemuxVertex(tezConf, outputPath, numPartitions); - - OrderedPartitionedKVEdgeConfig edgeConf = OrderedPartitionedKVEdgeConfig - .newBuilder(Text.class.getName(), NullWritable.class.getName(), - HashPartitioner.class.getName()) - .setFromConfiguration(tezConf) - .build(); - - DAG dag = DAG.create("Demuxer"); - Arrays.stream(inputVertices).forEach(dag::addVertex); - return dag - .addVertex(demuxVertex) - .addEdge( - GroupInputEdge.create( - dag.createVertexGroup("union", inputVertices), - demuxVertex, - edgeConf.createDefaultEdgeProperty(), - InputDescriptor.create(ConcatenatedMergedKeyValuesInput.class.getName()))); - } - private Vertex createInputVertex(TezConfiguration tezConf, String vertexName, Path path) { // This vertex represents an input vertex for the demuxer. It reads text data using the // TextInputFormat. ForwardingProcessor simply forwards the data downstream as is.