Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@
import org.apache.parquet.column.page.DictionaryPageReadStore;
import org.apache.parquet.filter2.compat.FilterCompat;
import org.apache.parquet.filter2.compat.RowGroupFilter;
import org.apache.parquet.hadoop.util.CompatibilityReader;
import org.apache.parquet.hadoop.util.CompatibilityUtil;

import org.apache.parquet.Log;
Expand Down Expand Up @@ -102,6 +103,13 @@ public class ParquetFileReader implements Closeable {

public static String PARQUET_READ_PARALLELISM = "parquet.metadata.read.parallelism";

// configure if we want to use Hadoop's V2 read(bytebuffer) API.
// If true, we try to read using the new Hadoop read(ByteBuffer) api. This reads data into the provided
// byteBuffer and allows us to potentially take advantage zero-copy read path in Hadoop.
// Else, we skip and either read into the byteBuffer's array (if its present) or allocate one and copy
// into it.
public static String PARQUET_HADOOP_BYTEBUFFER_READ = "parquet.read.use.byte.buffer";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why would we want to turn this property off? Does your performance eval use the byte buffer read path?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah so @isnotinvain and I had some discussion about this property as well (see his comments above). So his suggestion was to get rid of this property all together. We can either:

  1. Cache in a static var whether we computed Hadoop V1 / V2 classes and reuse that. Saves us the reflection call on subsequent files.
  2. Perform the Hadoop V1 / V2 check every time we start reading a file. If there is a chance that the Hadoop V1 / V2 situation changes in the lifetime of the JVM, this will help us pick up the appropriate approach. Not sure what the use-case is for this though. I added the knob here to save us the reflection call when you know for sure you're on Hadoop V1.

Personally I like option 1. We pay the cost once and it helps us get rid of one knob to tune. Thoughts?

Copy link
Contributor

@rdblue rdblue Jun 28, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm currently testing something out on this. Basically, we should get the class for ByteBufferReadable, then use it like this:

if (bbReadableClass != null && newV2ReaderCtor != null && bbReadableClass.isInstance(stream)) {
  return newV2ReaderCtor.newInstance(stream);
} else {
  return new V1Reader(stream);
}


private static ParquetMetadataConverter converter = new ParquetMetadataConverter();

/**
Expand Down Expand Up @@ -487,6 +495,7 @@ public static ParquetFileReader open(Configuration conf, Path file, ParquetMetad
private final FileMetaData fileMetaData; // may be null
private final ByteBufferAllocator allocator;
private final Configuration conf;
private final CompatibilityReader compatibilityReader;

// not final. in some cases, this may be lazily loaded for backward-compat.
private ParquetMetadata footer;
Expand Down Expand Up @@ -529,6 +538,7 @@ public ParquetFileReader(
// the codec factory to get decompressors
this.codecFactory = new CodecFactory(configuration, 0);
this.allocator = new HeapByteBufferAllocator();
this.compatibilityReader = CompatibilityUtil.getHadoopReader(conf.getBoolean(PARQUET_HADOOP_BYTEBUFFER_READ, true));
}

/**
Expand Down Expand Up @@ -561,6 +571,7 @@ public ParquetFileReader(Configuration conf, Path file, MetadataFilter filter) t
// the codec factory to get decompressors
this.codecFactory = new CodecFactory(conf, 0);
this.allocator = new HeapByteBufferAllocator();
this.compatibilityReader = CompatibilityUtil.getHadoopReader(conf.getBoolean(PARQUET_HADOOP_BYTEBUFFER_READ, true));
}

/**
Expand All @@ -584,6 +595,7 @@ public ParquetFileReader(Configuration conf, Path file, ParquetMetadata footer)
// the codec factory to get decompressors
this.codecFactory = new CodecFactory(conf, 0);
this.allocator = new HeapByteBufferAllocator();
this.compatibilityReader = CompatibilityUtil.getHadoopReader(conf.getBoolean(PARQUET_HADOOP_BYTEBUFFER_READ, true));
}

public ParquetMetadata getFooter() {
Expand Down Expand Up @@ -950,7 +962,7 @@ protected PageHeader readPageHeader() throws IOException {
// to allow reading older files (using dictionary) we need this.
// usually 13 to 19 bytes are missing
// if the last page is smaller than this, the page header itself is truncated in the buffer.
this.byteBuf.rewind(); // resetting the buffer to the position before we got the error
this.byteBuf.position(initialPos); // resetting the buffer to the position before we got the error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

was this a bug?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah seems like it. Rewind sets pos to 0. Think this was fixed by @jaltekruse in one of the commits I pulled in to complete this PR.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a test for this? We should make sure we have one.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure this code is even needed any more. The WorkaroundChunk class was added 3 years back and the comment from then says "it deals with a now fixed bug where compressedLength was missing a few bytes". While we do seem to exercise it (as we always use it for the last chunk), I'm not sure how often the bug which this catch was meant to solve gets hit.
If we need it, I can try and create a unit test for the WorkaroundChunk class. Don't see a straightforward way to test it as part of the FileReader as it is too many levels deep in the class.

LOG.info("completing the column chunk to read the page header");
pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream.
}
Expand Down Expand Up @@ -1039,8 +1051,11 @@ public void addChunk(ChunkDescriptor descriptor) {
public List<Chunk> readAll(FSDataInputStream f) throws IOException {
List<Chunk> result = new ArrayList<Chunk>(chunks.size());
f.seek(offset);

//Allocate the bytebuffer based on whether the FS can support it.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: There should be a space before "Allocate".

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed

ByteBuffer chunksByteBuffer = allocator.allocate(length);
CompatibilityUtil.getBuf(f, chunksByteBuffer, length);
compatibilityReader.readFully(f, chunksByteBuffer);

// report in a counter the data we just scanned
BenchmarkCounter.incrementBytesRead(length);
int currentChunkOffset = 0;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop.util;

import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.fs.FSDataInputStream;

/**
* Allows us to use either Hadoop V1 / V2 read APIs to read data without reflection. HadoopV2 implementation
* of this interface resides in a module with Hadoop2 dependencies.
* Note: classes that implement this interface are instantiated using reflection and must thus have a
* default constructor.
*/
public interface CompatibilityReader {

/**
* This method attempts to read into the provided readBuffer, readBuffer.remaining() bytes.
* @return Number of bytes read - should be readBuffer.remaining()
* @throws EOFException if readBuf.remaining() is greater than the number of bytes available to
* read on the FSDataInputStream f.
*/
int readFully(FSDataInputStream f, ByteBuffer readBuffer) throws IOException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.parquet.hadoop.util;

import java.io.IOException;
import java.nio.ByteBuffer;

import org.apache.hadoop.fs.FSDataInputStream;

/**
* Uses Hadoop V1's readFully(byte[], ...) APIs to read data.
*/
public class CompatibilityReaderV1 implements CompatibilityReader {

@Override
public int readFully(FSDataInputStream f, ByteBuffer readBuf) throws IOException {
int res;
if (readBuf.hasArray()) {
res = readWithExistingArray(f, readBuf);
} else {
res = readWithNewArray(f, readBuf);
}

return res;
}

private int readWithExistingArray(FSDataInputStream f, ByteBuffer readBuf) throws IOException {
int initPos = readBuf.position();
int remaining = readBuf.remaining();
f.readFully(readBuf.array(), readBuf.arrayOffset(), remaining);
readBuf.position(initPos + remaining);
return remaining;
}

private int readWithNewArray(FSDataInputStream f, ByteBuffer readBuf) throws IOException {
int remaining = readBuf.remaining();
byte[] buf = new byte[remaining];
f.readFully(buf);
readBuf.put(buf, 0, remaining);
return remaining;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,97 +18,46 @@
*/
package org.apache.parquet.hadoop.util;

import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.parquet.ShouldNeverHappenException;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.parquet.Log;

public class CompatibilityUtil {

// Will be set to true if the implementation of FSDataInputSteam supports
// the 2.x APIs, in particular reading using a provided ByteBuffer
private static boolean useV21;
public static final V21FileAPI fileAPI;
private static final String READER_V2_CLASS = "org.apache.parquet.hadoop.util.CompatibilityReaderV2";

private static class V21FileAPI {
private final Method PROVIDE_BUF_READ_METHOD;
private final Class<?> FSDataInputStreamCls;
private static final Log LOG = Log.getLog(CompatibilityUtil.class);

private V21FileAPI() throws ReflectiveOperationException {
final String PACKAGE = "org.apache.hadoop";
FSDataInputStreamCls = Class.forName(PACKAGE + ".fs.FSDataInputStream");
PROVIDE_BUF_READ_METHOD = FSDataInputStreamCls.getMethod("read", ByteBuffer.class);
public static CompatibilityReader getHadoopReader(boolean useV2) {
if (!useV2) {
return new CompatibilityReaderV1();
}
}

static {
// Test to see if a class from the Hadoop 2.x API is available
boolean v21 = true;
try {
Class.forName("org.apache.hadoop.io.compress.DirectDecompressor");
} catch (ClassNotFoundException cnfe) {
v21 = false;
}

useV21 = v21;
try {
if (v21) {
fileAPI = new V21FileAPI();
} else {
fileAPI = null;
}

} catch (ReflectiveOperationException e) {
throw new IllegalArgumentException("Error finding appropriate interfaces using reflection.", e);
if (!isHadoop2x()) {
LOG.info("Can't read Hadoop 2x classes, will be using 1x read APIs");
return new CompatibilityReaderV1();
}

return newV2Reader();
}

private static Object invoke(Method method, String errorMsg, Object instance, Object... args) {
// Test to see if a class from the Hadoop 2.x API is available
// If it is, we try to instantiate the V2 CompatibilityReader.
private static boolean isHadoop2x() {
boolean v2 = true;
try {
return method.invoke(instance, args);
} catch (IllegalAccessException e) {
throw new IllegalArgumentException(errorMsg, e);
} catch (InvocationTargetException e) {
throw new IllegalArgumentException(errorMsg, e);
Class.forName("org.apache.hadoop.io.compress.DirectDecompressor");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why use DirectDecompressor and not the class we're actually interested in, org.apache.hadoop.fs.ByteBufferReadable?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure can switch to ByteBufferReadable. Didn't really revisit it while pulling in Jason's original implementation.

} catch (ClassNotFoundException cnfe) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to catch NoClassDefFoundError as well, which is thrown when DirectDecompressor exists but a dependency of that class does not. That could happen in strange classpaths.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks for pointing out. Fixed.

v2 = false;
}
return v2;
}

public static int getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) throws IOException {
int res;
if (useV21) {
try {
res = (Integer) fileAPI.PROVIDE_BUF_READ_METHOD.invoke(f, readBuf);
} catch (InvocationTargetException e) {
if (e.getCause() instanceof UnsupportedOperationException) {
// the FSDataInputStream docs say specifically that implementations
// can choose to throw UnsupportedOperationException, so this should
// be a reasonable check to make to see if the interface is
// present but not implemented and we should be falling back
useV21 = false;
return getBuf(f, readBuf, maxSize);
} else if (e.getCause() instanceof IOException) {
throw (IOException) e.getCause();
} else {
// To handle any cases where a Runtime exception occurs and provide
// some additional context information. A stacktrace would just give
// a line number, this at least tells them we were using the version
// of the read method designed for using a ByteBuffer.
throw new IOException("Error reading out of an FSDataInputStream " +
"using the Hadoop 2 ByteBuffer based read method.", e.getCause());
}
} catch (IllegalAccessException e) {
// This method is public because it is defined in an interface,
// there should be no problems accessing it
throw new ShouldNeverHappenException(e);
}
} else {
byte[] buf = new byte[maxSize];
res = f.read(buf);
readBuf.put(buf, 0, res);
private static CompatibilityReader newV2Reader() {
try {
Class<?> reader = Class.forName(READER_V2_CLASS);
return (CompatibilityReader)reader.newInstance();
} catch (ReflectiveOperationException e) {
LOG.warn("Unable to instantiate Hadoop V2 compatibility reader class: " + READER_V2_CLASS + " , will be using 1x read APIs", e);
return new CompatibilityReaderV1();
}
return res;
}
}
Loading