diff --git a/pom.xml b/pom.xml index 0e5bafd06..fe59faf58 100644 --- a/pom.xml +++ b/pom.xml @@ -166,7 +166,7 @@ junit junit - 3.8.1 + 4.10 test diff --git a/src/main/java/parquet/format/Util.java b/src/main/java/parquet/format/Util.java index 9210f37d8..b063e2b6f 100644 --- a/src/main/java/parquet/format/Util.java +++ b/src/main/java/parquet/format/Util.java @@ -1,8 +1,20 @@ package parquet.format; +import static parquet.format.FileMetaData._Fields.CREATED_BY; +import static parquet.format.FileMetaData._Fields.KEY_VALUE_METADATA; +import static parquet.format.FileMetaData._Fields.NUM_ROWS; +import static parquet.format.FileMetaData._Fields.ROW_GROUPS; +import static parquet.format.FileMetaData._Fields.SCHEMA; +import static parquet.format.FileMetaData._Fields.VERSION; +import static parquet.format.event.Consumers.fieldConsumer; +import static parquet.format.event.Consumers.listElementsOf; +import static parquet.format.event.Consumers.listOf; +import static parquet.format.event.Consumers.struct; + import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.util.List; import org.apache.thrift.TBase; import org.apache.thrift.TException; @@ -10,6 +22,13 @@ import org.apache.thrift.protocol.TProtocol; import org.apache.thrift.transport.TIOStreamTransport; +import parquet.format.event.Consumers.Consumer; +import parquet.format.event.Consumers.DelegatingFieldConsumer; +import parquet.format.event.EventBasedThriftReader; +import parquet.format.event.TypedConsumer.I32Consumer; +import parquet.format.event.TypedConsumer.I64Consumer; +import parquet.format.event.TypedConsumer.StringConsumer; + /** * Utility to read/write metadata * We use the TCompactProtocol to serialize metadata @@ -34,6 +53,129 @@ public static void writeFileMetaData(parquet.format.FileMetaData fileMetadata, O public static FileMetaData readFileMetaData(InputStream from) throws IOException { return read(from, new FileMetaData()); } + /** + * reads the meta data from the stream + * @param from the stream to read the metadata from + * @param skipRowGroups whether row groups should be skipped + * @return the resulting metadata + * @throws IOException + */ + public static FileMetaData readFileMetaData(InputStream from, boolean skipRowGroups) throws IOException { + FileMetaData md = new FileMetaData(); + if (skipRowGroups) { + readFileMetaData(from, new DefaultFileMetaDataConsumer(md), skipRowGroups); + } else { + read(from, md); + } + return md; + } + + /** + * To read metadata in a streaming fashion. + * + * @author Julien Le Dem + * + */ + public static abstract class FileMetaDataConsumer { + abstract public void setVersion(int version); + abstract public void setSchema(List schema); + abstract public void setNumRows(long numRows); + abstract public void addRowGroup(RowGroup rowGroup); + abstract public void addKeyValueMetaData(KeyValue kv); + abstract public void setCreatedBy(String createdBy); + } + + /** + * Simple default consumer that sets the fields + * + * @author Julien Le Dem + * + */ + public static final class DefaultFileMetaDataConsumer extends FileMetaDataConsumer { + private final FileMetaData md; + + public DefaultFileMetaDataConsumer(FileMetaData md) { + this.md = md; + } + + @Override + public void setVersion(int version) { + md.setVersion(version); + } + + @Override + public void setSchema(List schema) { + md.setSchema(schema); + } + + @Override + public void setNumRows(long numRows) { + md.setNum_rows(numRows); + } + + @Override + public void setCreatedBy(String createdBy) { + md.setCreated_by(createdBy); + } + + @Override + public void addRowGroup(RowGroup rowGroup) { + md.addToRow_groups(rowGroup); + } + + @Override + public void addKeyValueMetaData(KeyValue kv) { + md.addToKey_value_metadata(kv); + } + } + + public static void readFileMetaData(InputStream from, FileMetaDataConsumer consumer) throws IOException { + readFileMetaData(from, consumer, false); + } + + public static void readFileMetaData(InputStream from, final FileMetaDataConsumer consumer, boolean skipRowGroups) throws IOException { + try { + DelegatingFieldConsumer eventConsumer = fieldConsumer() + .onField(VERSION, new I32Consumer() { + @Override + public void consume(int value) { + consumer.setVersion(value); + } + }).onField(SCHEMA, listOf(SchemaElement.class, new Consumer>() { + @Override + public void consume(List schema) { + consumer.setSchema(schema); + } + })).onField(NUM_ROWS, new I64Consumer() { + @Override + public void consume(long value) { + consumer.setNumRows(value); + } + }).onField(KEY_VALUE_METADATA, listElementsOf(struct(KeyValue.class, new Consumer() { + @Override + public void consume(KeyValue kv) { + consumer.addKeyValueMetaData(kv); + } + }))).onField(CREATED_BY, new StringConsumer() { + @Override + public void consume(String value) { + consumer.setCreatedBy(value); + } + }); + if (!skipRowGroups) { + eventConsumer = eventConsumer.onField(ROW_GROUPS, listElementsOf(struct(RowGroup.class, new Consumer() { + @Override + public void consume(RowGroup rowGroup) { + consumer.addRowGroup(rowGroup); + } + }))); + } + new EventBasedThriftReader(protocol(from)).readStruct(eventConsumer); + + } catch (TException e) { + throw new IOException("can not read FileMetaData: " + e.getMessage(), e); + } + } private static TProtocol protocol(OutputStream to) { return protocol(new TIOStreamTransport(to)); diff --git a/src/main/java/parquet/format/event/Consumers.java b/src/main/java/parquet/format/event/Consumers.java new file mode 100644 index 000000000..ce2788d44 --- /dev/null +++ b/src/main/java/parquet/format/event/Consumers.java @@ -0,0 +1,181 @@ +package parquet.format.event; + +import static java.util.Collections.unmodifiableMap; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.thrift.TBase; +import org.apache.thrift.TException; +import org.apache.thrift.TFieldIdEnum; +import org.apache.thrift.protocol.TList; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TProtocolUtil; + +import parquet.format.event.Consumers.Consumer; +import parquet.format.event.TypedConsumer.BoolConsumer; +import parquet.format.event.TypedConsumer.ListConsumer; +import parquet.format.event.TypedConsumer.StructConsumer; + +/** + * Entry point for reading thrift in a streaming fashion + * + * @author Julien Le Dem + * + */ +public class Consumers { + + /** + * To consume objects coming from a DelegatingFieldConsumer + * @author Julien Le Dem + * + * @param the type of consumed objects + */ + public static interface Consumer { + void consume(T t); + } + + /** + * Delegates reading the field to TypedConsumers. + * There is one TypedConsumer per thrift type. + * use {@link DelegatingFieldConsumer#onField(TFieldIdEnum, BoolConsumer)} et al. to consume specific thrift fields. + * @see Consumers#fieldConsumer() + * @author Julien Le Dem + * + */ + public static class DelegatingFieldConsumer implements FieldConsumer { + + private final Map contexts; + private final FieldConsumer defaultFieldEventConsumer; + + private DelegatingFieldConsumer(FieldConsumer defaultFieldEventConsumer, Map contexts) { + this.defaultFieldEventConsumer = defaultFieldEventConsumer; + this.contexts = unmodifiableMap(contexts); + } + + private DelegatingFieldConsumer() { + this(new SkippingFieldConsumer()); + } + + private DelegatingFieldConsumer(FieldConsumer defaultFieldEventConsumer) { + this(defaultFieldEventConsumer, Collections.emptyMap()); + } + + public DelegatingFieldConsumer onField(TFieldIdEnum e, TypedConsumer typedConsumer) { + Map newContexts = new HashMap(contexts); + newContexts.put(e.getThriftFieldId(), typedConsumer); + return new DelegatingFieldConsumer(defaultFieldEventConsumer, newContexts); + } + + @Override + public void consumeField( + TProtocol protocol, EventBasedThriftReader reader, + short id, byte type) throws TException { + TypedConsumer delegate = contexts.get(id); + if (delegate != null) { + delegate.read(protocol, reader, type); + } else { + defaultFieldEventConsumer.consumeField(protocol, reader, id, type); + } + } + } + + /** + * call onField on the resulting DelegatingFieldConsumer to handle individual fields + * @return a new DelegatingFieldConsumer + */ + public static DelegatingFieldConsumer fieldConsumer() { + return new DelegatingFieldConsumer(); + } + + /** + * To consume a list of elements + * @param c the type of the list content + * @param consumer the consumer that will receive the list + * @return a ListConsumer that can be passed to the DelegatingFieldConsumer + */ + public static > ListConsumer listOf(Class c, final Consumer> consumer) { + class ListConsumer implements Consumer { + List list; + @Override + public void consume(T t) { + list.add(t); + } + } + final ListConsumer co = new ListConsumer(); + return new DelegatingListElementsConsumer(struct(c, co)) { + @Override + public void consumeList(TProtocol protocol, + EventBasedThriftReader reader, TList tList) throws TException { + co.list = new ArrayList(); + super.consumeList(protocol, reader, tList); + consumer.consume(co.list); + } + }; + } + + /** + * To consume list elements one by one + * @param consumer the consumer that will read the elements + * @return a ListConsumer that can be passed to the DelegatingFieldConsumer + */ + public static ListConsumer listElementsOf(TypedConsumer consumer) { + return new DelegatingListElementsConsumer(consumer); + } + + public static > StructConsumer struct(final Class c, final Consumer consumer) { + return new TBaseStructConsumer(c, consumer); + } +} + +class SkippingFieldConsumer implements FieldConsumer { + @Override + public void consumeField(TProtocol protocol, EventBasedThriftReader reader, short id, byte type) throws TException { + TProtocolUtil.skip(protocol, type); + } +} + +class DelegatingListElementsConsumer extends ListConsumer { + + private TypedConsumer elementConsumer; + + protected DelegatingListElementsConsumer(TypedConsumer consumer) { + this.elementConsumer = consumer; + } + + @Override + public void consumeElement(TProtocol protocol, EventBasedThriftReader reader, byte elemType) throws TException { + elementConsumer.read(protocol, reader, elemType); + } +} +class TBaseStructConsumer> extends StructConsumer { + + private final Class c; + private Consumer consumer; + + public TBaseStructConsumer(Class c, Consumer consumer) { + this.c = c; + this.consumer = consumer; + } + + @Override + public void consumeStruct(TProtocol protocol, EventBasedThriftReader reader) throws TException { + T o = newObject(); + o.read(protocol); + consumer.consume(o); + } + + protected T newObject() { + try { + return c.newInstance(); + } catch (InstantiationException e) { + throw new RuntimeException(c.getName(), e); + } catch (IllegalAccessException e) { + throw new RuntimeException(c.getName(), e); + } + } + +} \ No newline at end of file diff --git a/src/main/java/parquet/format/event/EventBasedThriftReader.java b/src/main/java/parquet/format/event/EventBasedThriftReader.java new file mode 100644 index 000000000..2e66d4154 --- /dev/null +++ b/src/main/java/parquet/format/event/EventBasedThriftReader.java @@ -0,0 +1,111 @@ +package parquet.format.event; + +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TField; +import org.apache.thrift.protocol.TList; +import org.apache.thrift.protocol.TMap; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TSet; +import org.apache.thrift.protocol.TType; + +import parquet.format.event.TypedConsumer.ListConsumer; +import parquet.format.event.TypedConsumer.MapConsumer; +import parquet.format.event.TypedConsumer.SetConsumer; + +/** + * Event based reader for Thrift + * + * @author Julien Le Dem + * + */ +public final class EventBasedThriftReader { + + private final TProtocol protocol; + + /** + * @param protocol the protocol to read from + */ + public EventBasedThriftReader(TProtocol protocol) { + this.protocol = protocol; + } + + /** + * reads a Struct from the underlying protocol and passes the field events to the FieldConsumer + * @param c the field consumer + * @throws TException + */ + public void readStruct(FieldConsumer c) throws TException { + protocol.readStructBegin(); + readStructContent(c); + protocol.readStructEnd(); + } + + /** + * reads the content of a struct (fields) from the underlying protocol and passes the events to c + * @param c the field consumer + * @throws TException + */ + public void readStructContent(FieldConsumer c) throws TException { + TField field; + while (true) { + field = protocol.readFieldBegin(); + if (field.type == TType.STOP) { + break; + } + c.consumeField(protocol, this, field.id, field.type); + } + } + + /** + * reads the set content (elements) from the underlying protocol and passes the events to the set event consumer + * @param eventConsumer the consumer + * @param tSet the set descriptor + * @throws TException + */ + public void readSetContent(SetConsumer eventConsumer, TSet tSet) + throws TException { + for (int i = 0; i < tSet.size; i++) { + eventConsumer.consumeElement(protocol, this, tSet.elemType); + } + } + + /** + * reads the map content (key values) from the underlying protocol and passes the events to the map event consumer + * @param eventConsumer the consumer + * @param tMap the map descriptor + * @throws TException + */ + public void readMapContent(MapConsumer eventConsumer, TMap tMap) + throws TException { + for (int i = 0; i < tMap.size; i++) { + eventConsumer.consumeEntry(protocol, this, tMap.keyType, tMap.valueType); + } + } + + /** + * reads a key-value pair + * @param keyType the type of the key + * @param keyConsumer the consumer for the key + * @param valueType the type of the value + * @param valueConsumer the consumer for the value + * @throws TException + */ + public void readMapEntry(byte keyType, TypedConsumer keyConsumer, byte valueType, TypedConsumer valueConsumer) + throws TException { + keyConsumer.read(protocol, this, keyType); + valueConsumer.read(protocol, this, valueType); + } + + /** + * reads the list content (elements) from the underlying protocol and passes the events to the list event consumer + * @param eventConsumer the consumer + * @param tList the list descriptor + * @throws TException + */ + public void readListContent(ListConsumer eventConsumer, TList tList) + throws TException { + for (int i = 0; i < tList.size; i++) { + eventConsumer.consumeElement(protocol, this, tList.elemType); + } + } +} \ No newline at end of file diff --git a/src/main/java/parquet/format/event/FieldConsumer.java b/src/main/java/parquet/format/event/FieldConsumer.java new file mode 100644 index 000000000..d8dd038ee --- /dev/null +++ b/src/main/java/parquet/format/event/FieldConsumer.java @@ -0,0 +1,25 @@ +package parquet.format.event; + +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TProtocol; + +/** + * To receive Thrift field events + * + * @author Julien Le Dem + * + */ +public interface FieldConsumer { + + /** + * called by the EventBasedThriftReader when reading a field from a Struct + * @param protocol the underlying protocol + * @param eventBasedThriftReader the reader to delegate to further calls. + * @param id the id of the field + * @param type the type of the field + * @return the typed consumer to pass the value to + * @throws TException + */ + public void consumeField(TProtocol protocol, EventBasedThriftReader eventBasedThriftReader, short id, byte type) throws TException; + +} \ No newline at end of file diff --git a/src/main/java/parquet/format/event/TypedConsumer.java b/src/main/java/parquet/format/event/TypedConsumer.java new file mode 100644 index 000000000..f4e6fe1e0 --- /dev/null +++ b/src/main/java/parquet/format/event/TypedConsumer.java @@ -0,0 +1,186 @@ +package parquet.format.event; + +import static org.apache.thrift.protocol.TType.BOOL; +import static org.apache.thrift.protocol.TType.BYTE; +import static org.apache.thrift.protocol.TType.DOUBLE; +import static org.apache.thrift.protocol.TType.I16; +import static org.apache.thrift.protocol.TType.I32; +import static org.apache.thrift.protocol.TType.I64; +import static org.apache.thrift.protocol.TType.LIST; +import static org.apache.thrift.protocol.TType.MAP; +import static org.apache.thrift.protocol.TType.SET; +import static org.apache.thrift.protocol.TType.STRING; +import static org.apache.thrift.protocol.TType.STRUCT; + +import org.apache.thrift.TException; +import org.apache.thrift.protocol.TList; +import org.apache.thrift.protocol.TMap; +import org.apache.thrift.protocol.TProtocol; +import org.apache.thrift.protocol.TSet; + +/** + * receive thrift events of a given type + * + * @author Julien Le Dem + * + */ +abstract public class TypedConsumer { + + abstract public static class DoubleConsumer extends TypedConsumer { + protected DoubleConsumer() { super(DOUBLE); } + @Override + final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException { + this.consume(protocol.readDouble()); + } + abstract public void consume(double value); + } + + abstract public static class ByteConsumer extends TypedConsumer { + protected ByteConsumer() { super(BYTE); } + @Override + final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException { + this.consume(protocol.readByte()); + } + abstract public void consume(byte value); + } + + abstract public static class BoolConsumer extends TypedConsumer { + protected BoolConsumer() { super(BOOL); } + @Override + final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException { + this.consume(protocol.readBool()); + } + abstract public void consume(boolean value); + } + + abstract public static class I32Consumer extends TypedConsumer { + protected I32Consumer() { super(I32); } + @Override + final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException { + this.consume(protocol.readI32()); + } + abstract public void consume(int value); + } + + abstract public static class I64Consumer extends TypedConsumer { + protected I64Consumer() { super(I64); } + final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException { + this.consume(protocol.readI64()); + } + abstract public void consume(long value); + } + + abstract public static class I16Consumer extends TypedConsumer { + protected I16Consumer() { super(I16); } + @Override + final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException { + this.consume(protocol.readI16()); + } + abstract public void consume(short value); + } + + abstract public static class StringConsumer extends TypedConsumer { + protected StringConsumer() { super(STRING); } + @Override + final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException { + this.consume(protocol.readString()); + } + abstract public void consume(String value); + } + + abstract public static class StructConsumer extends TypedConsumer { + protected StructConsumer() { super(STRUCT); } + @Override + final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException { + this.consumeStruct(protocol, reader); + } + /** + * can either delegate to the reader or read the struct from the protocol + * reader.readStruct(fieldConsumer); + * @param protocol the underlying protocol + * @param reader the reader to delegate to + * @throws TException + */ + abstract public void consumeStruct(TProtocol protocol, EventBasedThriftReader reader) throws TException; + } + + abstract public static class ListConsumer extends TypedConsumer { + protected ListConsumer() { super(LIST); } + @Override + final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException { + this.consumeList(protocol, reader, protocol.readListBegin()); + protocol.readListEnd(); + } + public void consumeList(TProtocol protocol, EventBasedThriftReader reader, TList tList) throws TException { + reader.readListContent(this, tList); + } + /** + * can either delegate to the reader or read the element from the protocol + * @param protocol the underlying protocol + * @param reader the reader to delegate to + * @throws TException + */ + abstract public void consumeElement(TProtocol protocol, EventBasedThriftReader reader, byte elemType) throws TException; + } + + abstract public static class SetConsumer extends TypedConsumer { + protected SetConsumer() { super(SET); } + @Override + final void read(TProtocol protocol, EventBasedThriftReader reader) throws TException { + this.consumeSet(protocol, reader, protocol.readSetBegin()); + protocol.readSetEnd(); + } + public void consumeSet(TProtocol protocol, EventBasedThriftReader reader, TSet tSet) throws TException { + reader.readSetContent(this, tSet); + } + /** + * can either delegate to the reader or read the set from the protocol + * @param protocol the underlying protocol + * @param reader the reader to delegate to + * @throws TException + */ + abstract public void consumeElement( + TProtocol protocol, EventBasedThriftReader reader, + byte elemType) throws TException; + } + + abstract public static class MapConsumer extends TypedConsumer { + protected MapConsumer() { super(MAP); } + @Override + final void read(TProtocol protocol, EventBasedThriftReader reader) + throws TException { + this.consumeMap(protocol, reader , protocol.readMapBegin()); + protocol.readMapEnd(); + } + public void consumeMap(TProtocol protocol, EventBasedThriftReader reader, TMap tMap) throws TException { + reader.readMapContent(this, tMap); + } + /** + * can either delegate to the reader or read the map entry from the protocol + * @param protocol the underlying protocol + * @param reader the reader to delegate to + * @throws TException + */ + abstract public void consumeEntry( + TProtocol protocol, EventBasedThriftReader reader, + byte keyType, byte valueType) throws TException; + } + + public final byte type; + + private TypedConsumer(byte type) { + this.type = type; + } + + final public void read(TProtocol protocol, EventBasedThriftReader reader, byte type) throws TException { + if (this.type != type) { + throw new TException( + "Incorrect type in stream. " + + "Expected " + this.type + + " but got " + type); + } + this.read(protocol, reader); + } + + abstract void read(TProtocol protocol, EventBasedThriftReader reader) throws TException; +} \ No newline at end of file diff --git a/src/test/java/parquet/format/TestUtil.java b/src/test/java/parquet/format/TestUtil.java new file mode 100644 index 000000000..14f814de9 --- /dev/null +++ b/src/test/java/parquet/format/TestUtil.java @@ -0,0 +1,65 @@ +package parquet.format; + +import static java.util.Arrays.asList; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertNull; +import static parquet.format.Util.readFileMetaData; +import static parquet.format.Util.writeFileMetaData; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; + +import org.junit.Test; + +import parquet.format.Util.DefaultFileMetaDataConsumer; +public class TestUtil { + + @Test + public void testReadFileMetadata() throws Exception { + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + FileMetaData md = new FileMetaData( + 1, + asList(new SchemaElement("foo")), + 10, + asList( + new RowGroup( + asList( + new ColumnChunk(0), + new ColumnChunk(1) + ), + 10, + 5), + new RowGroup( + asList( + new ColumnChunk(2), + new ColumnChunk(3) + ), + 11, + 5) + ) + ); + writeFileMetaData(md , baos); + FileMetaData md2 = readFileMetaData(in(baos)); + FileMetaData md3 = new FileMetaData(); + readFileMetaData(in(baos), new DefaultFileMetaDataConsumer(md3)); + FileMetaData md4 = new FileMetaData(); + readFileMetaData(in(baos), new DefaultFileMetaDataConsumer(md4), true); + FileMetaData md5 = readFileMetaData(in(baos), true); + FileMetaData md6 = readFileMetaData(in(baos), false); + assertEquals(md, md2); + assertEquals(md, md3); + assertNull(md4.getRow_groups()); + assertNull(md5.getRow_groups()); + assertEquals(md4, md5); + md4.setRow_groups(md.getRow_groups()); + md5.setRow_groups(md.getRow_groups()); + assertEquals(md, md4); + assertEquals(md, md5); + assertEquals(md4, md5); + assertEquals(md, md6); + } + + private ByteArrayInputStream in(ByteArrayOutputStream baos) { + return new ByteArrayInputStream(baos.toByteArray()); + } +}