Skip to content

Commit da28832

Browse files
committed
merge multi parquet files to one file
1 parent c26fa78 commit da28832

File tree

3 files changed

+158
-0
lines changed

3 files changed

+158
-0
lines changed
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.parquet.tools.command;
20+
21+
import org.apache.commons.cli.CommandLine;
22+
import org.apache.hadoop.conf.Configuration;
23+
import org.apache.hadoop.fs.FileStatus;
24+
import org.apache.hadoop.fs.FileSystem;
25+
import org.apache.hadoop.fs.Path;
26+
import org.apache.parquet.hadoop.util.HiddenFileFilter;
27+
import org.slf4j.Logger;
28+
import org.apache.parquet.hadoop.ParquetFileWriter;
29+
import org.apache.parquet.hadoop.metadata.FileMetaData;
30+
import org.slf4j.LoggerFactory;
31+
32+
import java.io.IOException;
33+
import java.util.ArrayList;
34+
import java.util.List;
35+
36+
public class MergeCommand extends ArgsOnlyCommand {
37+
public static final String[] USAGE = new String[] {
38+
"<input> [<input> ...] <output>",
39+
"where <input> is the source parquet files/directory to be merged",
40+
" <output> is the destination parquet file"
41+
};
42+
43+
/**
44+
* Biggest number of input files we can merge.
45+
*/
46+
private static final int MAX_FILE_NUM = 100;
47+
48+
private Configuration conf;
49+
50+
public MergeCommand() {
51+
super(2, MAX_FILE_NUM + 1);
52+
53+
conf = new Configuration();
54+
}
55+
56+
@Override
57+
public String[] getUsageDescription() {
58+
return USAGE;
59+
}
60+
61+
@Override
62+
public void execute(CommandLine options) throws Exception {
63+
// Prepare arguments
64+
List<String> args = options.getArgList();
65+
List<Path> inputFiles = getInputFiles(args.subList(0, args.size() - 1));
66+
Path outputFile = new Path(args.get(args.size() - 1));
67+
68+
// Merge schema and extraMeta
69+
FileMetaData mergedMeta = mergedMetadata(inputFiles);
70+
71+
// Merge data
72+
ParquetFileWriter writer = new ParquetFileWriter(conf,
73+
mergedMeta.getSchema(), outputFile, ParquetFileWriter.Mode.CREATE);
74+
writer.start();
75+
for (Path input: inputFiles) {
76+
writer.appendFile(conf, input);
77+
}
78+
writer.end(mergedMeta.getKeyValueMetaData());
79+
}
80+
81+
private FileMetaData mergedMetadata(List<Path> inputFiles) throws IOException {
82+
return ParquetFileWriter.mergeMetadataFiles(inputFiles, conf).getFileMetaData();
83+
}
84+
85+
/**
86+
* Get all input files.
87+
* @param input input files or directory.
88+
* @return ordered input files.
89+
*/
90+
private List<Path> getInputFiles(List<String> input) throws IOException {
91+
if (input.size() == 1) {
92+
Path p = new Path(input.get(0));
93+
FileSystem fs = p.getFileSystem(conf);
94+
FileStatus status = fs.getFileStatus(p);
95+
96+
if (status.isDir()) {
97+
return getInputFilesFromDirection(status);
98+
}
99+
}
100+
101+
return parseInputFiles(input);
102+
}
103+
104+
/**
105+
* Get all parquet files under partition directory.
106+
* @param partitionDir partition directory.
107+
* @return parquet files to be merged.
108+
*/
109+
private List<Path> getInputFilesFromDirection(FileStatus partitionDir) throws IOException {
110+
FileSystem fs = partitionDir.getPath().getFileSystem(conf);
111+
FileStatus[] inputFiles = fs.listStatus(partitionDir.getPath(), HiddenFileFilter.INSTANCE);
112+
113+
List<Path> input = new ArrayList<Path>();
114+
for (FileStatus f: inputFiles) {
115+
input.add(f.getPath());
116+
}
117+
return input;
118+
}
119+
120+
private List<Path> parseInputFiles(List<String> input) {
121+
List<Path> inputFiles = new ArrayList<Path>();
122+
123+
for (String name: input) {
124+
inputFiles.add(new Path(name));
125+
}
126+
127+
return inputFiles;
128+
}
129+
}

parquet-tools/src/main/java/org/apache/parquet/tools/command/Registry.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ public final class Registry {
3131
registry.put("schema", ShowSchemaCommand.class);
3232
registry.put("meta", ShowMetaCommand.class);
3333
registry.put("dump", DumpCommand.class);
34+
registry.put("merge", MergeCommand.class);
3435
}
3536

3637
public static Map<String,Command> allCommands() {
Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
#!/usr/bin/env bash
2+
#
3+
# Licensed to the Apache Software Foundation (ASF) under one
4+
# or more contributor license agreements. See the NOTICE file
5+
# distributed with this work for additional information
6+
# regarding copyright ownership. The ASF licenses this file
7+
# to you under the Apache License, Version 2.0 (the
8+
# "License"); you may not use this file except in compliance
9+
# with the License. You may obtain a copy of the License at
10+
#
11+
# http://www.apache.org/licenses/LICENSE-2.0
12+
#
13+
# Unless required by applicable law or agreed to in writing,
14+
# software distributed under the License is distributed on an
15+
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
16+
# KIND, either express or implied. See the License for the
17+
# specific language governing permissions and limitations
18+
# under the License.
19+
#
20+
21+
# The name of the top-level script
22+
TOPSCRIPT="parquet-tools"
23+
24+
# Determine the path to the script's directory
25+
APPPATH=$( cd "$(dirname "$0")" ; pwd -P )
26+
27+
# Run the application
28+
exec "${APPPATH}/${TOPSCRIPT}" merge "$@"

0 commit comments

Comments
 (0)