Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,16 @@
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;

import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.tools.Main;
import org.apache.parquet.tools.read.SimpleReadSupport;
import org.apache.parquet.tools.read.SimpleRecord;
import org.apache.parquet.tools.json.JsonRecordFormatter;

public class CatCommand extends ArgsOnlyCommand {
public static final String[] USAGE = new String[] {
Expand Down Expand Up @@ -71,9 +75,12 @@ public void execute(CommandLine options) throws Exception {
try {
PrintWriter writer = new PrintWriter(Main.out, true);
reader = ParquetReader.builder(new SimpleReadSupport(), new Path(input)).build();
ParquetMetadata metadata = ParquetFileReader.readFooter(new Configuration(), new Path(input));
JsonRecordFormatter.JsonGroupFormatter formatter = JsonRecordFormatter.fromSchema(metadata.getFileMetaData().getSchema());

for (SimpleRecord value = reader.read(); value != null; value = reader.read()) {
if (options.hasOption('j')) {
value.prettyPrintJson(writer);
writer.write(formatter.formatRecord(value));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain how this was missing records? Was it a bug in the prettyPrintJson method?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The primary issue is that SimpleObject doesn't understand the concept of an array. It simulates an array by adding multiple items to the same key. The problem with that approach is that the Json representation then depends on the amount of data written. A key with only one value would appear as a singleton-object, while items with multiple entries appended to the same key would appear as an array. This would result in an inconsistent json encoding.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading you comment again I think I misunderstood your question. Basically, the issue is with prettyPrintJson:

protected Object toJsonObject() {
    Map<String, Object> result = Maps.newLinkedHashMap();
    for (NameValue value : values) {
      // This doesn't work because there might be  multiple values with the same key - OVERWRITE.
      // Could change this to a Map<String, Collection> but then we have the same problem in reverse
      // that is, most entries will have only one entry while other items may be arrays. We could
      // just print single-item collections as non-arrays, and multiple item collections as arrays
      // but that's very dirty. Doing this sort of formatting here breaks the 'Single Responsibility Principle' (TM) 
      // anyway so might as well pull the functionality out and do it correctly in a separate location. 
      result.put(value.getName(), toJsonValue(value.getValue()));
    }

    return result;
  }

  protected static Object toJsonValue(Object val) {
    if (SimpleRecord.class.isAssignableFrom(val.getClass())) {
      return ((SimpleRecord) val).toJsonObject();
    } else if (byte[].class == val.getClass()) {
      return new BinaryNode((byte[]) val);
    } else {
      return val;
    }
  }

It's the same basic issue I outlined in my previous comment (but I was addressing the design choice rather than why it was failing in the first place - which was your actual question I think). Please see the comment above.

Thanks

} else {
value.prettyPrint(writer);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.parquet.tools.json;

import org.apache.parquet.schema.GroupType;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.tools.read.SimpleRecord;
import org.codehaus.jackson.map.ObjectMapper;

import java.io.IOException;
import java.util.*;

public abstract class JsonRecordFormatter<T> {
private static final int SINGLE_VALUE = 0;

public static class JsonPrimitiveWriter extends JsonRecordFormatter<Object> {

public JsonPrimitiveWriter(Type primitiveType) {
super(primitiveType);
}

@Override
protected Object formatResults(List<Object> listOfValues) {
if (super.typeInfo.getRepetition() == Type.Repetition.REPEATED) {
return listOfValues;
} else {
return listOfValues.get(SINGLE_VALUE);
}
}
}

public static class JsonGroupFormatter extends JsonRecordFormatter<SimpleRecord> {
private final Map<String, JsonRecordFormatter> formatters;

public JsonGroupFormatter(GroupType schema) {
super(schema);

this.formatters = buildWriters(schema);
}

private Map<String, JsonRecordFormatter> buildWriters(GroupType groupSchema) {
Map<String, JsonRecordFormatter> writers = new LinkedHashMap<String, JsonRecordFormatter>();
for (Type type : groupSchema.getFields()) {
if (type.isPrimitive()) {
writers.put(type.getName(), new JsonPrimitiveWriter(type));
} else {
writers.put(type.getName(), new JsonGroupFormatter((GroupType) type));
}
}

return writers;
}

private Object add(SimpleRecord record) {
return formatEntries(collateEntries(record));
}

private Map<String, List<Object>> collateEntries(SimpleRecord record) {
Map<String, List<Object>> collatedEntries = new LinkedHashMap<String, List<Object>>();
for (SimpleRecord.NameValue value : record.getValues()) {
if (collatedEntries.containsKey(value.getName())) {
collatedEntries.get(value.getName()).add(value.getValue());
} else {
List<Object> newResultListForKey = new ArrayList<Object>();
newResultListForKey.add(value.getValue());
collatedEntries.put(value.getName(), newResultListForKey);
}
}

return collatedEntries;
}

private Object formatEntries(Map<String, List<Object>> entries) {
Map<String, Object> results = new LinkedHashMap<String, Object>();
for (Map.Entry<String, List<Object>> entry : entries.entrySet()) {
JsonRecordFormatter formatter = formatters.get(entry.getKey());
results.put(entry.getKey(), formatter.formatResults(entry.getValue()));
}

return results;
}

@Override
protected Object formatResults(List<SimpleRecord> values) {
if (super.typeInfo.getRepetition() == Type.Repetition.REPEATED) {
List<Object> results = new ArrayList<Object>();
for (SimpleRecord object : values) {
results.add(add(object));
}

return results;
} else {
return add(values.get(SINGLE_VALUE));
}
}

public String formatRecord(SimpleRecord value) throws IOException {
ObjectMapper mapper = new ObjectMapper();
return mapper.writeValueAsString(add(value));
}
}

protected final Type typeInfo;

protected JsonRecordFormatter(Type type) {
this.typeInfo = type;
}

protected abstract Object formatResults(List<T> values);

public static JsonGroupFormatter fromSchema(MessageType messageType) {
return new JsonGroupFormatter(messageType);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ protected Object toJsonObject() {
for (NameValue value : values) {
result.put(value.getName(), toJsonValue(value.getValue()));
}

return result;
}

Expand Down
Loading