Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<version>4.10</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down
142 changes: 142 additions & 0 deletions src/main/java/parquet/format/Util.java
Original file line number Diff line number Diff line change
@@ -1,15 +1,34 @@
package parquet.format;

import static parquet.format.FileMetaData._Fields.CREATED_BY;
import static parquet.format.FileMetaData._Fields.KEY_VALUE_METADATA;
import static parquet.format.FileMetaData._Fields.NUM_ROWS;
import static parquet.format.FileMetaData._Fields.ROW_GROUPS;
import static parquet.format.FileMetaData._Fields.SCHEMA;
import static parquet.format.FileMetaData._Fields.VERSION;
import static parquet.format.event.Consumers.fieldConsumer;
import static parquet.format.event.Consumers.listElementsOf;
import static parquet.format.event.Consumers.listOf;
import static parquet.format.event.Consumers.struct;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.List;

import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.protocol.TCompactProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TIOStreamTransport;

import parquet.format.event.Consumers.Consumer;
import parquet.format.event.Consumers.DelegatingFieldConsumer;
import parquet.format.event.EventBasedThriftReader;
import parquet.format.event.TypedConsumer.I32Consumer;
import parquet.format.event.TypedConsumer.I64Consumer;
import parquet.format.event.TypedConsumer.StringConsumer;

/**
* Utility to read/write metadata
* We use the TCompactProtocol to serialize metadata
Expand All @@ -34,6 +53,129 @@ public static void writeFileMetaData(parquet.format.FileMetaData fileMetadata, O
public static FileMetaData readFileMetaData(InputStream from) throws IOException {
return read(from, new FileMetaData());
}
/**
* reads the meta data from the stream
* @param from the stream to read the metadata from
* @param skipRowGroups whether row groups should be skipped
* @return the resulting metadata
* @throws IOException
*/
public static FileMetaData readFileMetaData(InputStream from, boolean skipRowGroups) throws IOException {
FileMetaData md = new FileMetaData();
if (skipRowGroups) {
readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups);
} else {
read(from, md);
}
return md;
}

/**
* To read metadata in a streaming fashion.
*
* @author Julien Le Dem
*
*/
public static abstract class FileMetaDataConsumer {
abstract public void setVersion(int version);
abstract public void setSchema(List<SchemaElement> schema);
abstract public void setNumRows(long numRows);
abstract public void addRowGroup(RowGroup rowGroup);
abstract public void addKeyValueMetaData(KeyValue kv);
abstract public void setCreatedBy(String createdBy);
}

/**
* Simple default consumer that sets the fields
*
* @author Julien Le Dem
*
*/
public static final class DefaultFileMetaDataConsumer extends FileMetaDataConsumer {
private final FileMetaData md;

public DefaultFileMetaDataConsumer(FileMetaData md) {
this.md = md;
}

@Override
public void setVersion(int version) {
md.setVersion(version);
}

@Override
public void setSchema(List<SchemaElement> schema) {
md.setSchema(schema);
}

@Override
public void setNumRows(long numRows) {
md.setNum_rows(numRows);
}

@Override
public void setCreatedBy(String createdBy) {
md.setCreated_by(createdBy);
}

@Override
public void addRowGroup(RowGroup rowGroup) {
md.addToRow_groups(rowGroup);
}

@Override
public void addKeyValueMetaData(KeyValue kv) {
md.addToKey_value_metadata(kv);
}
}

public static void readFileMetaData(InputStream from, FileMetaDataConsumer consumer) throws IOException {
readFileMetaData(from, consumer, false);
}

public static void readFileMetaData(InputStream from, final FileMetaDataConsumer consumer, boolean skipRowGroups) throws IOException {
try {
DelegatingFieldConsumer eventConsumer = fieldConsumer()
.onField(VERSION, new I32Consumer() {
@Override
public void consume(int value) {
consumer.setVersion(value);
}
}).onField(SCHEMA, listOf(SchemaElement.class, new Consumer<List<SchemaElement>>() {
@Override
public void consume(List<SchemaElement> schema) {
consumer.setSchema(schema);
}
})).onField(NUM_ROWS, new I64Consumer() {
@Override
public void consume(long value) {
consumer.setNumRows(value);
}
}).onField(KEY_VALUE_METADATA, listElementsOf(struct(KeyValue.class, new Consumer<KeyValue>() {
@Override
public void consume(KeyValue kv) {
consumer.addKeyValueMetaData(kv);
}
}))).onField(CREATED_BY, new StringConsumer() {
@Override
public void consume(String value) {
consumer.setCreatedBy(value);
}
});
if (!skipRowGroups) {
eventConsumer = eventConsumer.onField(ROW_GROUPS, listElementsOf(struct(RowGroup.class, new Consumer<RowGroup>() {
@Override
public void consume(RowGroup rowGroup) {
consumer.addRowGroup(rowGroup);
}
})));
}
new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer);

} catch (TException e) {
throw new IOException("can not read FileMetaData: " + e.getMessage(), e);
}
}

private static TProtocol protocol(OutputStream to) {
return protocol(new TIOStreamTransport(to));
Expand Down
181 changes: 181 additions & 0 deletions src/main/java/parquet/format/event/Consumers.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
package parquet.format.event;

import static java.util.Collections.unmodifiableMap;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.thrift.TBase;
import org.apache.thrift.TException;
import org.apache.thrift.TFieldIdEnum;
import org.apache.thrift.protocol.TList;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.protocol.TProtocolUtil;

import parquet.format.event.Consumers.Consumer;
import parquet.format.event.TypedConsumer.BoolConsumer;
import parquet.format.event.TypedConsumer.ListConsumer;
import parquet.format.event.TypedConsumer.StructConsumer;

/**
* Entry point for reading thrift in a streaming fashion
*
* @author Julien Le Dem
*
*/
public class Consumers {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems the Consumers here is just used as a namespace. So should it be a package name?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missed, it's providing some static util methods


/**
* To consume objects coming from a DelegatingFieldConsumer
* @author Julien Le Dem
*
* @param <T> the type of consumed objects
*/
public static interface Consumer<T> {
void consume(T t);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add a comment: The delegatingFieldConsumer delegates addField call to corresponding TypedConsumer?

/**
* Delegates reading the field to TypedConsumers.
* There is one TypedConsumer per thrift type.
* use {@link DelegatingFieldConsumer#onField(TFieldIdEnum, BoolConsumer)} et al. to consume specific thrift fields.
* @see Consumers#fieldConsumer()
* @author Julien Le Dem
*
*/
public static class DelegatingFieldConsumer implements FieldConsumer {

private final Map<Short, TypedConsumer> contexts;
private final FieldConsumer defaultFieldEventConsumer;

private DelegatingFieldConsumer(FieldConsumer defaultFieldEventConsumer, Map<Short, TypedConsumer> contexts) {
this.defaultFieldEventConsumer = defaultFieldEventConsumer;
this.contexts = unmodifiableMap(contexts);
}

private DelegatingFieldConsumer() {
this(new SkippingFieldConsumer());
}

private DelegatingFieldConsumer(FieldConsumer defaultFieldEventConsumer) {
this(defaultFieldEventConsumer, Collections.<Short, TypedConsumer>emptyMap());
}

public DelegatingFieldConsumer onField(TFieldIdEnum e, TypedConsumer typedConsumer) {
Map<Short, TypedConsumer> newContexts = new HashMap<Short, TypedConsumer>(contexts);
newContexts.put(e.getThriftFieldId(), typedConsumer);
return new DelegatingFieldConsumer(defaultFieldEventConsumer, newContexts);
}

@Override
public void consumeField(
TProtocol protocol, EventBasedThriftReader reader,
short id, byte type) throws TException {
TypedConsumer delegate = contexts.get(id);
if (delegate != null) {
delegate.read(protocol, reader, type);
} else {
defaultFieldEventConsumer.consumeField(protocol, reader, id, type);
}
}
}

/**
* call onField on the resulting DelegatingFieldConsumer to handle individual fields
* @return a new DelegatingFieldConsumer
*/
public static DelegatingFieldConsumer fieldConsumer() {
return new DelegatingFieldConsumer();
}

/**
* To consume a list of elements
* @param c the type of the list content
* @param consumer the consumer that will receive the list
* @return a ListConsumer that can be passed to the DelegatingFieldConsumer
*/
public static <T extends TBase<T,? extends TFieldIdEnum>> ListConsumer listOf(Class<T> c, final Consumer<List<T>> consumer) {
class ListConsumer implements Consumer<T> {
List<T> list;
@Override
public void consume(T t) {
list.add(t);
}
}
final ListConsumer co = new ListConsumer();
return new DelegatingListElementsConsumer(struct(c, co)) {
@Override
public void consumeList(TProtocol protocol,
EventBasedThriftReader reader, TList tList) throws TException {
co.list = new ArrayList<T>();
super.consumeList(protocol, reader, tList);
consumer.consume(co.list);
}
};
}

/**
* To consume list elements one by one
* @param consumer the consumer that will read the elements
* @return a ListConsumer that can be passed to the DelegatingFieldConsumer
*/
public static ListConsumer listElementsOf(TypedConsumer consumer) {
return new DelegatingListElementsConsumer(consumer);
}

public static <T extends TBase<T,? extends TFieldIdEnum>> StructConsumer struct(final Class<T> c, final Consumer<T> consumer) {
return new TBaseStructConsumer<T>(c, consumer);
}
}

class SkippingFieldConsumer implements FieldConsumer {
@Override
public void consumeField(TProtocol protocol, EventBasedThriftReader reader, short id, byte type) throws TException {
TProtocolUtil.skip(protocol, type);
}
}

class DelegatingListElementsConsumer extends ListConsumer {

private TypedConsumer elementConsumer;

protected DelegatingListElementsConsumer(TypedConsumer consumer) {
this.elementConsumer = consumer;
}

@Override
public void consumeElement(TProtocol protocol, EventBasedThriftReader reader, byte elemType) throws TException {
elementConsumer.read(protocol, reader, elemType);
}
}
class TBaseStructConsumer<T extends TBase<T, ? extends TFieldIdEnum>> extends StructConsumer {

private final Class<T> c;
private Consumer<T> consumer;

public TBaseStructConsumer(Class<T> c, Consumer<T> consumer) {
this.c = c;
this.consumer = consumer;
}

@Override
public void consumeStruct(TProtocol protocol, EventBasedThriftReader reader) throws TException {
T o = newObject();
o.read(protocol);
consumer.consume(o);
}

protected T newObject() {
try {
return c.newInstance();
} catch (InstantiationException e) {
throw new RuntimeException(c.getName(), e);
} catch (IllegalAccessException e) {
throw new RuntimeException(c.getName(), e);
}
}

}
Loading