Skip to content

Commit fdbe03f

Browse files
committed
ARROW-367: converter json <=> Arrow file format for Integration tests
1 parent 4fa7ac4 commit fdbe03f

File tree

6 files changed

+500
-114
lines changed

6 files changed

+500
-114
lines changed
Lines changed: 244 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,244 @@
1+
package org.apache.arrow.tools;
2+
3+
import java.io.File;
4+
import java.io.FileInputStream;
5+
import java.io.FileOutputStream;
6+
import java.io.IOException;
7+
import java.util.Arrays;
8+
import java.util.Iterator;
9+
import java.util.List;
10+
11+
import org.apache.arrow.memory.BufferAllocator;
12+
import org.apache.arrow.memory.RootAllocator;
13+
import org.apache.arrow.vector.FieldVector;
14+
import org.apache.arrow.vector.VectorLoader;
15+
import org.apache.arrow.vector.VectorSchemaRoot;
16+
import org.apache.arrow.vector.VectorUnloader;
17+
import org.apache.arrow.vector.file.ArrowBlock;
18+
import org.apache.arrow.vector.file.ArrowFooter;
19+
import org.apache.arrow.vector.file.ArrowReader;
20+
import org.apache.arrow.vector.file.ArrowWriter;
21+
import org.apache.arrow.vector.file.json.JsonFileReader;
22+
import org.apache.arrow.vector.file.json.JsonFileWriter;
23+
import org.apache.arrow.vector.schema.ArrowRecordBatch;
24+
import org.apache.arrow.vector.types.pojo.Field;
25+
import org.apache.arrow.vector.types.pojo.Schema;
26+
import org.apache.commons.cli.CommandLine;
27+
import org.apache.commons.cli.CommandLineParser;
28+
import org.apache.commons.cli.Options;
29+
import org.apache.commons.cli.ParseException;
30+
import org.apache.commons.cli.PosixParser;
31+
import org.slf4j.Logger;
32+
import org.slf4j.LoggerFactory;
33+
34+
import com.google.common.base.Objects;
35+
36+
public class Integration {
37+
private static final Logger LOGGER = LoggerFactory.getLogger(Integration.class);
38+
39+
public static void main(String[] args) {
40+
try {
41+
new Integration().run(args);
42+
} catch (ParseException e) {
43+
fatalError("Invalid parameters", e);
44+
} catch (IOException e) {
45+
fatalError("Error accessing files", e);
46+
} catch (RuntimeException e) {
47+
fatalError("Incompatible files", e);
48+
}
49+
}
50+
51+
private final Options options;
52+
53+
enum Command {
54+
ARROW_TO_JSON(true, false) {
55+
@Override
56+
public void execute(File arrowFile, File jsonFile) throws IOException {
57+
try(
58+
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
59+
FileInputStream fileInputStream = new FileInputStream(arrowFile);
60+
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);) {
61+
ArrowFooter footer = arrowReader.readFooter();
62+
Schema schema = footer.getSchema();
63+
LOGGER.debug("Input file size: " + arrowFile.length());
64+
LOGGER.debug("Found schema: " + schema);
65+
try (JsonFileWriter writer = new JsonFileWriter(jsonFile);) {
66+
writer.start(schema);
67+
List<ArrowBlock> recordBatches = footer.getRecordBatches();
68+
for (ArrowBlock rbBlock : recordBatches) {
69+
try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
70+
VectorSchemaRoot root = new VectorSchemaRoot(schema, allocator);) {
71+
VectorLoader vectorLoader = new VectorLoader(root);
72+
vectorLoader.load(inRecordBatch);
73+
writer.write(root);
74+
}
75+
}
76+
}
77+
LOGGER.debug("Output file size: " + jsonFile.length());
78+
}
79+
}
80+
},
81+
JSON_TO_ARROW(false, true) {
82+
@Override
83+
public void execute(File arrowFile, File jsonFile) throws IOException {
84+
try (
85+
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
86+
JsonFileReader reader = new JsonFileReader(jsonFile, allocator);
87+
) {
88+
Schema schema = reader.start();
89+
LOGGER.debug("Input file size: " + jsonFile.length());
90+
LOGGER.debug("Found schema: " + schema);
91+
try (
92+
FileOutputStream fileOutputStream = new FileOutputStream(arrowFile);
93+
ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
94+
) {
95+
96+
// initialize vectors
97+
VectorSchemaRoot root;
98+
while ((root = reader.read()) != null) {
99+
VectorUnloader vectorUnloader = new VectorUnloader(root);
100+
try (ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();) {
101+
arrowWriter.writeRecordBatch(recordBatch);
102+
}
103+
root.close();
104+
}
105+
}
106+
LOGGER.debug("Output file size: " + arrowFile.length());
107+
}
108+
}
109+
},
110+
VALIDATE(true, true) {
111+
@Override
112+
public void execute(File arrowFile, File jsonFile) throws IOException {
113+
try (
114+
BufferAllocator allocator = new RootAllocator(Integer.MAX_VALUE);
115+
JsonFileReader jsonReader = new JsonFileReader(jsonFile, allocator);
116+
FileInputStream fileInputStream = new FileInputStream(arrowFile);
117+
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), allocator);
118+
) {
119+
Schema jsonSchema = jsonReader.start();
120+
ArrowFooter footer = arrowReader.readFooter();
121+
Schema arrowSchema = footer.getSchema();
122+
LOGGER.debug("Arrow Input file size: " + arrowFile.length());
123+
LOGGER.debug("ARROW schema: " + arrowSchema);
124+
LOGGER.debug("JSON Input file size: " + jsonFile.length());
125+
LOGGER.debug("JSON schema: " + jsonSchema);
126+
compareSchemas(jsonSchema, arrowSchema);
127+
128+
List<ArrowBlock> recordBatches = footer.getRecordBatches();
129+
Iterator<ArrowBlock> iterator = recordBatches.iterator();
130+
VectorSchemaRoot jsonRoot;
131+
while ((jsonRoot = jsonReader.read()) != null && iterator.hasNext()) {
132+
ArrowBlock rbBlock = iterator.next();
133+
try (ArrowRecordBatch inRecordBatch = arrowReader.readRecordBatch(rbBlock);
134+
VectorSchemaRoot arrowRoot = new VectorSchemaRoot(arrowSchema, allocator);) {
135+
VectorLoader vectorLoader = new VectorLoader(arrowRoot);
136+
vectorLoader.load(inRecordBatch);
137+
// TODO: compare
138+
compare(arrowRoot, jsonRoot);
139+
}
140+
jsonRoot.close();
141+
}
142+
boolean hasMoreJSON = jsonRoot != null;
143+
boolean hasMoreArrow = iterator.hasNext();
144+
if (hasMoreJSON || hasMoreArrow) {
145+
throw new IllegalArgumentException("Unexpected RecordBatches. J:" + hasMoreJSON + " A:" + hasMoreArrow);
146+
}
147+
}
148+
}
149+
};
150+
151+
public final boolean arrowExists;
152+
public final boolean jsonExists;
153+
154+
Command(boolean arrowExists, boolean jsonExists) {
155+
this.arrowExists = arrowExists;
156+
this.jsonExists = jsonExists;
157+
}
158+
159+
abstract public void execute(File arrowFile, File jsonFile) throws IOException;
160+
161+
}
162+
163+
Integration() {
164+
this.options = new Options();
165+
this.options.addOption("a", "arrow", true, "arrow file");
166+
this.options.addOption("j", "json", true, "json file");
167+
this.options.addOption("c", "command", true, "command to execute: " + Arrays.toString(Command.values()));
168+
}
169+
170+
private File validateFile(String type, String fileName, boolean shouldExist) {
171+
if (fileName == null) {
172+
throw new IllegalArgumentException("missing " + type + " file parameter");
173+
}
174+
File f = new File(fileName);
175+
if (shouldExist && (!f.exists() || f.isDirectory())) {
176+
throw new IllegalArgumentException(type + " file not found: " + f.getAbsolutePath());
177+
}
178+
if (!shouldExist && f.exists()) {
179+
throw new IllegalArgumentException(type + " file already exists: " + f.getAbsolutePath());
180+
}
181+
return f;
182+
}
183+
184+
void run(String[] args) throws ParseException, IOException {
185+
CommandLineParser parser = new PosixParser();
186+
CommandLine cmd = parser.parse(options, args, false);
187+
188+
189+
Command command = toCommand(cmd.getOptionValue("command"));
190+
File arrowFile = validateFile("arrow", cmd.getOptionValue("arrow"), command.arrowExists);
191+
File jsonFile = validateFile("json", cmd.getOptionValue("json"), command.jsonExists);
192+
command.execute(arrowFile, jsonFile);
193+
}
194+
195+
private Command toCommand(String commandName) {
196+
try {
197+
return Command.valueOf(commandName);
198+
} catch (IllegalArgumentException e) {
199+
throw new IllegalArgumentException("Unknown command: " + commandName + " expected one of " + Arrays.toString(Command.values()));
200+
}
201+
}
202+
203+
private static void fatalError(String message, Throwable e) {
204+
System.err.println(message);
205+
LOGGER.error(message, e);
206+
System.exit(1);
207+
}
208+
209+
210+
private static void compare(VectorSchemaRoot arrowRoot, VectorSchemaRoot jsonRoot) {
211+
compareSchemas(jsonRoot.getSchema(), arrowRoot.getSchema());
212+
if (arrowRoot.getRowCount() != jsonRoot.getRowCount()) {
213+
throw new IllegalArgumentException("Different row count:\n" + arrowRoot.getRowCount() + "\n" + jsonRoot.getRowCount());
214+
}
215+
List<FieldVector> arrowVectors = arrowRoot.getFieldVectors();
216+
List<FieldVector> jsonVectors = jsonRoot.getFieldVectors();
217+
if (arrowVectors.size() != jsonVectors.size()) {
218+
throw new IllegalArgumentException("Different column count:\n" + arrowVectors.size() + "\n" + jsonVectors.size());
219+
}
220+
for (int i = 0; i < arrowVectors.size(); i++) {
221+
Field field = arrowRoot.getSchema().getFields().get(i);
222+
FieldVector arrowVector = arrowVectors.get(i);
223+
FieldVector jsonVector = jsonVectors.get(i);
224+
int valueCount = arrowVector.getAccessor().getValueCount();
225+
if (valueCount != jsonVector.getAccessor().getValueCount()) {
226+
throw new IllegalArgumentException("Different value count for field " + field + " : " + valueCount + " != " + jsonVector.getAccessor().getValueCount());
227+
}
228+
for (int j = 0; j < valueCount; j++) {
229+
Object arrow = arrowVector.getAccessor().getObject(j);
230+
Object json = jsonVector.getAccessor().getObject(j);
231+
if (!Objects.equal(arrow, json)) {
232+
throw new IllegalArgumentException(
233+
"Different values in column:\n" + field + " at index " + j + ": " + arrow + " != " + json);
234+
}
235+
}
236+
}
237+
}
238+
239+
private static void compareSchemas(Schema jsonSchema, Schema arrowSchema) {
240+
if (!arrowSchema.equals(jsonSchema)) {
241+
throw new IllegalArgumentException("Different schemas:\n" + arrowSchema + "\n" + jsonSchema);
242+
}
243+
}
244+
}
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
package org.apache.arrow.tools;
2+
3+
import java.io.File;
4+
import java.io.FileInputStream;
5+
import java.io.FileNotFoundException;
6+
import java.io.FileOutputStream;
7+
import java.io.IOException;
8+
import java.util.List;
9+
10+
import org.apache.arrow.memory.BufferAllocator;
11+
import org.apache.arrow.vector.FieldVector;
12+
import org.apache.arrow.vector.VectorLoader;
13+
import org.apache.arrow.vector.VectorSchemaRoot;
14+
import org.apache.arrow.vector.VectorUnloader;
15+
import org.apache.arrow.vector.complex.MapVector;
16+
import org.apache.arrow.vector.complex.impl.ComplexWriterImpl;
17+
import org.apache.arrow.vector.complex.writer.BaseWriter.ComplexWriter;
18+
import org.apache.arrow.vector.complex.writer.BaseWriter.MapWriter;
19+
import org.apache.arrow.vector.complex.writer.BigIntWriter;
20+
import org.apache.arrow.vector.complex.writer.IntWriter;
21+
import org.apache.arrow.vector.file.ArrowBlock;
22+
import org.apache.arrow.vector.file.ArrowFooter;
23+
import org.apache.arrow.vector.file.ArrowReader;
24+
import org.apache.arrow.vector.file.ArrowWriter;
25+
import org.apache.arrow.vector.schema.ArrowRecordBatch;
26+
import org.apache.arrow.vector.types.pojo.Schema;
27+
import org.junit.Assert;
28+
29+
public class ArrowFileTestFixtures {
30+
static final int COUNT = 10;
31+
32+
static void writeData(int count, MapVector parent) {
33+
ComplexWriter writer = new ComplexWriterImpl("root", parent);
34+
MapWriter rootWriter = writer.rootAsMap();
35+
IntWriter intWriter = rootWriter.integer("int");
36+
BigIntWriter bigIntWriter = rootWriter.bigInt("bigInt");
37+
for (int i = 0; i < count; i++) {
38+
intWriter.setPosition(i);
39+
intWriter.writeInt(i);
40+
bigIntWriter.setPosition(i);
41+
bigIntWriter.writeBigInt(i);
42+
}
43+
writer.setValueCount(count);
44+
}
45+
46+
static void validateOutput(File testOutFile, BufferAllocator allocator) throws Exception {
47+
// read
48+
try (
49+
BufferAllocator readerAllocator = allocator.newChildAllocator("reader", 0, Integer.MAX_VALUE);
50+
FileInputStream fileInputStream = new FileInputStream(testOutFile);
51+
ArrowReader arrowReader = new ArrowReader(fileInputStream.getChannel(), readerAllocator);
52+
BufferAllocator vectorAllocator = allocator.newChildAllocator("final vectors", 0, Integer.MAX_VALUE);
53+
) {
54+
ArrowFooter footer = arrowReader.readFooter();
55+
Schema schema = footer.getSchema();
56+
57+
// initialize vectors
58+
try (VectorSchemaRoot root = new VectorSchemaRoot(schema, readerAllocator)) {
59+
VectorLoader vectorLoader = new VectorLoader(root);
60+
61+
List<ArrowBlock> recordBatches = footer.getRecordBatches();
62+
for (ArrowBlock rbBlock : recordBatches) {
63+
try (ArrowRecordBatch recordBatch = arrowReader.readRecordBatch(rbBlock)) {
64+
vectorLoader.load(recordBatch);
65+
}
66+
validateContent(COUNT, root);
67+
}
68+
}
69+
}
70+
}
71+
72+
static void validateContent(int count, VectorSchemaRoot root) {
73+
Assert.assertEquals(count, root.getRowCount());
74+
for (int i = 0; i < count; i++) {
75+
Assert.assertEquals(i, root.getVector("int").getAccessor().getObject(i));
76+
Assert.assertEquals(Long.valueOf(i), root.getVector("bigInt").getAccessor().getObject(i));
77+
}
78+
}
79+
80+
static void write(FieldVector parent, File file) throws FileNotFoundException, IOException {
81+
Schema schema = new Schema(parent.getField().getChildren());
82+
int valueCount = parent.getAccessor().getValueCount();
83+
List<FieldVector> fields = parent.getChildrenFromFields();
84+
VectorUnloader vectorUnloader = new VectorUnloader(schema, valueCount, fields);
85+
try (
86+
FileOutputStream fileOutputStream = new FileOutputStream(file);
87+
ArrowWriter arrowWriter = new ArrowWriter(fileOutputStream.getChannel(), schema);
88+
ArrowRecordBatch recordBatch = vectorUnloader.getRecordBatch();
89+
) {
90+
arrowWriter.writeRecordBatch(recordBatch);
91+
}
92+
}
93+
94+
95+
static void writeInput(File testInFile, BufferAllocator allocator) throws FileNotFoundException, IOException {
96+
int count = ArrowFileTestFixtures.COUNT;
97+
try (
98+
BufferAllocator vectorAllocator = allocator.newChildAllocator("original vectors", 0, Integer.MAX_VALUE);
99+
MapVector parent = new MapVector("parent", vectorAllocator, null)) {
100+
writeData(count, parent);
101+
write(parent.getChild("root"), testInFile);
102+
}
103+
}
104+
}

0 commit comments

Comments
 (0)