Skip to content

Commit 7f8e952

Browse files
Piyush Narangjulienledem
authored andcommitted
PARQUET-642: Improve performance of ByteBuffer based read / write paths
While trying out the newest Parquet version, we noticed that the changes to start using ByteBuffers: 6b605a4 and 6b24a1d (mostly avro but a couple of ByteBuffer changes) caused our jobs to slow down a bit. Read overhead: 4-6% (in MB_Millis) Write overhead: 6-10% (MB_Millis). Seems like this seems to be due to the encoding / decoding of Strings in the [Binary class](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java): [toStringUsingUTF8()](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java#L388) - for reads [encodeUTF8()](https://github.com/apache/parquet-mr/blob/master/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java#L236) - for writes With these changes we see around 5% improvement in MB_Millis while running the job on our Hadoop cluster. Added some microbenchmark details to the jira. Note that I've left the behavior the same for the avro write path - it still uses CharSequence and the Charset based encoders. Author: Piyush Narang <pnarang@twitter.com> Closes #347 from piyushnarang/bytebuffer-encoding-fix-pr and squashes the following commits: 43c5bdd [Piyush Narang] Keep avro on char sequence 2d50c8c [Piyush Narang] Update Binary approach 9e58237 [Piyush Narang] Proof of concept fixes
1 parent 9c40a7b commit 7f8e952

File tree

2 files changed

+53
-22
lines changed

2 files changed

+53
-22
lines changed

parquet-avro/src/main/java/org/apache/parquet/avro/AvroWriteSupport.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -364,7 +364,7 @@ private Binary fromAvroString(Object value) {
364364
Utf8 utf8 = (Utf8) value;
365365
return Binary.fromReusedByteArray(utf8.getBytes(), 0, utf8.getByteLength());
366366
}
367-
return Binary.fromString((CharSequence) value);
367+
return Binary.fromCharSequence((CharSequence) value);
368368
}
369369

370370
private static GenericData getDataModel(Configuration conf) {

parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java

Lines changed: 52 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import java.nio.charset.StandardCharsets;
3232
import java.util.Arrays;
3333

34+
import org.apache.parquet.io.ParquetDecodingException;
3435
import org.apache.parquet.io.ParquetEncodingException;
3536

3637
import static org.apache.parquet.bytes.BytesUtils.UTF8;
@@ -214,7 +215,28 @@ public void writeTo(DataOutput out) throws IOException {
214215
}
215216

216217
private static class FromStringBinary extends ByteBufferBackedBinary {
217-
public FromStringBinary(CharSequence value) {
218+
public FromStringBinary(String value) {
219+
// reused is false, because we do not hold on to the buffer after
220+
// conversion, and nobody else has a handle to it
221+
super(encodeUTF8(value), false);
222+
}
223+
224+
@Override
225+
public String toString() {
226+
return "Binary{\"" + toStringUsingUTF8() + "\"}";
227+
}
228+
229+
private static ByteBuffer encodeUTF8(String value) {
230+
try {
231+
return ByteBuffer.wrap(value.getBytes("UTF-8"));
232+
} catch (UnsupportedEncodingException e) {
233+
throw new ParquetEncodingException("UTF-8 not supported.", e);
234+
}
235+
}
236+
}
237+
238+
private static class FromCharSequenceBinary extends ByteBufferBackedBinary {
239+
public FromCharSequenceBinary(CharSequence value) {
218240
// reused is false, because we do not hold on to the buffer after
219241
// conversion, and nobody else has a handle to it
220242
super(encodeUTF8(value), false);
@@ -226,12 +248,12 @@ public String toString() {
226248
}
227249

228250
private static final ThreadLocal<CharsetEncoder> ENCODER =
229-
new ThreadLocal<CharsetEncoder>() {
230-
@Override
231-
protected CharsetEncoder initialValue() {
232-
return StandardCharsets.UTF_8.newEncoder();
233-
}
234-
};
251+
new ThreadLocal<CharsetEncoder>() {
252+
@Override
253+
protected CharsetEncoder initialValue() {
254+
return StandardCharsets.UTF_8.newEncoder();
255+
}
256+
};
235257

236258
private static ByteBuffer encodeUTF8(CharSequence value) {
237259
try {
@@ -386,16 +408,26 @@ public ByteBufferBackedBinary(ByteBuffer value, int offset, int length, boolean
386408

387409
@Override
388410
public String toStringUsingUTF8() {
389-
int limit = value.limit();
390-
value.limit(offset+length);
391-
int position = value.position();
392-
value.position(offset);
393-
// no corresponding interface to read a subset of a buffer, would have to slice it
394-
// which creates another ByteBuffer object or do what is done here to adjust the
395-
// limit/offset and set them back after
396-
String ret = UTF8.decode(value).toString();
397-
value.limit(limit);
398-
value.position(position);
411+
String ret;
412+
if (value.hasArray()) {
413+
try {
414+
ret = new String(value.array(), value.arrayOffset() + offset, length, "UTF-8");
415+
} catch (UnsupportedEncodingException e) {
416+
throw new ParquetDecodingException("UTF-8 not supported");
417+
}
418+
} else {
419+
int limit = value.limit();
420+
value.limit(offset+length);
421+
int position = value.position();
422+
value.position(offset);
423+
// no corresponding interface to read a subset of a buffer, would have to slice it
424+
// which creates another ByteBuffer object or do what is done here to adjust the
425+
// limit/offset and set them back after
426+
ret = UTF8.decode(value).toString();
427+
value.limit(limit);
428+
value.position(position);
429+
}
430+
399431
return ret;
400432
}
401433

@@ -555,12 +587,11 @@ public static Binary fromByteBuffer(final ByteBuffer value) {
555587
}
556588

557589
public static Binary fromString(String value) {
558-
// this method is for binary backward-compatibility
559-
return fromString((CharSequence) value);
590+
return new FromStringBinary(value);
560591
}
561592

562-
public static Binary fromString(CharSequence value) {
563-
return new FromStringBinary(value);
593+
public static Binary fromCharSequence(CharSequence value) {
594+
return new FromCharSequenceBinary(value);
564595
}
565596

566597
/**

0 commit comments

Comments
 (0)