diff --git a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java index 0a24e76d5e..3f21a4e58d 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/Encoding.java @@ -31,7 +31,7 @@ import org.apache.parquet.column.page.DictionaryPage; import org.apache.parquet.column.values.ValuesReader; import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader; -import org.apache.parquet.column.values.boundedint.ZeroIntegerValuesReader; +import org.apache.parquet.column.values.rle.ZeroIntegerValuesReader; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader; import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader; import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader; diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java index 9ed7736f82..e3881f8110 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java @@ -31,7 +31,7 @@ import org.apache.parquet.column.impl.ColumnWriteStoreV2; import org.apache.parquet.column.page.PageWriteStore; import org.apache.parquet.column.values.ValuesWriter; -import org.apache.parquet.column.values.boundedint.DevNullValuesWriter; +import org.apache.parquet.column.values.bitpacking.DevNullValuesWriter; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForInteger; import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriterForLong; import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter; diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/DevNullValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/DevNullValuesWriter.java similarity index 97% rename from parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/DevNullValuesWriter.java rename to parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/DevNullValuesWriter.java index af92941ded..a6fa1e3eed 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/DevNullValuesWriter.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/DevNullValuesWriter.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.parquet.column.values.boundedint; +package org.apache.parquet.column.values.bitpacking; import static org.apache.parquet.column.Encoding.BIT_PACKED; import org.apache.parquet.bytes.BytesInput; diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java deleted file mode 100644 index caea5b51c6..0000000000 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java +++ /dev/null @@ -1,124 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.column.values.boundedint; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.parquet.io.ParquetDecodingException; - -class BitReader { - private int currentByte = 0; - private int currentPosition = 8; - private ByteBuffer buf; - private int currentBufferPosition = 0; - private static final int[] byteGetValueMask = new int[8]; - private static final int[] readMask = new int[32]; - private int endBufferPosistion; - - static { - int currentMask = 1; - for (int i = 0; i < byteGetValueMask.length; i++) { - byteGetValueMask[i] = currentMask; - currentMask <<= 1; - } - currentMask = 0; - for (int i = 0; i < readMask.length; i++) { - readMask[i] = currentMask; - currentMask <<= 1; - currentMask += 1; - } - } - - /** - * Prepare to deserialize bit-packed integers from the given array. - * The array is not copied, so must not be mutated during the course of - * reading. - */ - public void prepare(ByteBuffer buf, int offset, int length) { - this.buf = buf; - this.endBufferPosistion = offset + length; - currentByte = 0; - currentPosition = 8; - currentBufferPosition = offset; - } - - /** - * Extract the given bit index from the given value. - */ - private static boolean extractBit(int val, int bit) { - return (val & byteGetValueMask[bit]) != 0; - } - - /** - * Read an integer from the stream which is represented by a specified - * number of bits. - * @param bitsPerValue the number of bits used to represent the integer - */ - public int readNBitInteger(int bitsPerValue) { - int bits = bitsPerValue + currentPosition; - int currentValue = currentByte >>> currentPosition; - int toShift = 8 - currentPosition; - while (bits >= 8) { - currentByte = getNextByte(); - currentValue |= currentByte << toShift; - toShift += 8; - bits -= 8; - } - currentValue &= readMask[bitsPerValue]; - currentPosition = (bitsPerValue + currentPosition) % 8; - return currentValue; - } - - private int getNextByte() { - if (currentBufferPosition < endBufferPosistion) { - return buf.get(currentBufferPosition++) & 0xFF; - } - return 0; - } - - public boolean readBit() throws IOException { - if (currentPosition == 8) { - currentByte = getNextByte(); - currentPosition = 0; - } - return extractBit(currentByte, currentPosition++); - } - - public int readByte() { - currentByte |= (getNextByte() << 8); - int value = (currentByte >>> currentPosition) & 0xFF; - currentByte >>>= 8; - return value; - } - - public int readUnsignedVarint() throws IOException { - int value = 0; - int i = 0; - int b; - while (((b = readByte()) & 0x80) != 0) { - value |= (b & 0x7F) << i; - i += 7; - if (i > 35) { - throw new ParquetDecodingException("Variable length quantity is too long"); - } - } - return value | (b << i); - } -} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java deleted file mode 100644 index 9489714b31..0000000000 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java +++ /dev/null @@ -1,167 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.column.values.boundedint; - -import org.apache.parquet.bytes.ByteBufferAllocator; -import org.apache.parquet.Log; -import org.apache.parquet.bytes.BytesInput; -import org.apache.parquet.bytes.CapacityByteArrayOutputStream; - -class BitWriter { - private static final Log LOG = Log.getLog(BitWriter.class); - private static final boolean DEBUG = false;//Log.DEBUG; - - private CapacityByteArrayOutputStream baos; - private int currentByte = 0; - private int currentBytePosition = 0; - private static final int[] byteToTrueMask = new int[8]; - private static final int[] byteToFalseMask = new int[8]; - private boolean finished = false; - static { - int currentMask = 1; - for (int i = 0; i < byteToTrueMask.length; i++) { - byteToTrueMask[i] = currentMask; - byteToFalseMask[i] = ~currentMask; - currentMask <<= 1; - } - } - - public BitWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) { - this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator); - } - - public void writeBit(boolean bit) { - if (DEBUG) LOG.debug("writing: " + (bit ? "1" : "0")); - currentByte = setBytePosition(currentByte, currentBytePosition++, bit); - if (currentBytePosition == 8) { - baos.write(currentByte); - if (DEBUG) LOG.debug("to buffer: " + toBinary(currentByte)); - currentByte = 0; - currentBytePosition = 0; - } - } - - public void writeByte(int val) { - if (DEBUG) LOG.debug("writing: " + toBinary(val) + " (" + val + ")"); - currentByte |= ((val & 0xFF) << currentBytePosition); - baos.write(currentByte); - if (DEBUG) LOG.debug("to buffer: " + toBinary(currentByte)); - currentByte >>>= 8; - } - - /** - * Write the given integer, serialized using the given number of bits. - * It is assumed that the integer can be correctly serialized within - * the provided bit size. - * @param val the value to serialize - * @param bitsToWrite the number of bits to use - */ - public void writeNBitInteger(int val, int bitsToWrite) { - if (DEBUG) LOG.debug("writing: " + toBinary(val, bitsToWrite) + " (" + val + ")"); - val <<= currentBytePosition; - int upperByte = currentBytePosition + bitsToWrite; - currentByte |= val; - while (upperByte >= 8) { - baos.write(currentByte); //this only writes the lowest byte - if (DEBUG) LOG.debug("to buffer: " + toBinary(currentByte)); - upperByte -= 8; - currentByte >>>= 8; - } - currentBytePosition = (currentBytePosition + bitsToWrite) % 8; - } - - private String toBinary(int val, int alignTo) { - String result = Integer.toBinaryString(val); - while (result.length() < alignTo) { - result = "0" + result; - } - return result; - } - - private String toBinary(int val) { - return toBinary(val, 8); - } - - public BytesInput finish() { - if (!finished) { - if (currentBytePosition > 0) { - baos.write(currentByte); - if (DEBUG) LOG.debug("to buffer: " + toBinary(currentByte)); - } - } - finished = true; - return BytesInput.from(baos); - } - - public void reset() { - baos.reset(); - currentByte = 0; - currentBytePosition = 0; - finished = false; - } - - /** - * Set or clear the given bit position in the given byte. - * @param currentByte the byte to mutate - * @param bitOffset the bit to set or clear - * @param newBitValue whether to set or clear the bit - * @return the mutated byte - */ - private static int setBytePosition(int currentByte, int bitOffset, boolean newBitValue) { - if (newBitValue) { - currentByte |= byteToTrueMask[bitOffset]; - } else { - currentByte &= byteToFalseMask[bitOffset]; - } - return currentByte; - } - - //This assumes you will never give it a negative value - public void writeUnsignedVarint(int value) { - while ((value & 0xFFFFFF80) != 0L) { - writeByte((value & 0x7F) | 0x80); - value >>>= 7; - } - writeByte(value & 0x7F); - } - - public int getMemSize() { - // baos = 8 bytes - // currentByte + currentBytePosition = 8 bytes - // the size of baos: - // count : 4 bytes (rounded to 8) - // buf : 12 bytes (8 ptr + 4 length) should technically be rounded to 8 depending on buffer size - return 32 + (int)baos.size(); - } - - public int getCapacity() { - return baos.getCapacity(); - } - - public String memUsageString(String prefix) { - return baos.memUsageString(prefix); - } - - public void close() { - currentByte = 0; - currentBytePosition = 0; - finished = false; - baos.close(); - } -} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java deleted file mode 100644 index bbbf8dae88..0000000000 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.column.values.boundedint; - -import org.apache.parquet.bytes.ByteBufferAllocator; -import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.column.values.ValuesWriter; - -public abstract class BoundedIntValuesFactory { - public static ValuesReader getBoundedReader(int bound) { - return bound == 0 ? new ZeroIntegerValuesReader() : new BoundedIntValuesReader(bound); - } - - public static ValuesWriter getBoundedWriter(int bound, int initialCapacity, int pageSize, ByteBufferAllocator allocator) { - return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity, pageSize, allocator); - } -} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java deleted file mode 100644 index c322125e7e..0000000000 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.column.values.boundedint; - -import static org.apache.parquet.Log.DEBUG; - -import java.io.IOException; -import java.nio.ByteBuffer; - -import org.apache.parquet.Log; -import org.apache.parquet.bytes.BytesUtils; -import org.apache.parquet.column.values.ValuesReader; -import org.apache.parquet.io.ParquetDecodingException; - -/** - * @see BoundedIntValuesWriter - */ -class BoundedIntValuesReader extends ValuesReader { - private static final Log LOG = Log.getLog(BoundedIntValuesReader.class); - - private int currentValueCt = 0; - private int currentValue = 0; - private final int bitsPerValue; - private BitReader bitReader = new BitReader(); - private int nextOffset; - - public BoundedIntValuesReader(int bound) { - if (bound == 0) { - throw new ParquetDecodingException("Value bound cannot be 0. Use DevNullColumnReader instead."); - } - bitsPerValue = BytesUtils.getWidthFromMaxInt(bound); - } - - @Override - public int readInteger() { - try { - if (currentValueCt > 0) { - currentValueCt--; - return currentValue; - } - if (bitReader.readBit()) { - currentValue = bitReader.readNBitInteger(bitsPerValue); - currentValueCt = bitReader.readUnsignedVarint() - 1; - } else { - currentValue = bitReader.readNBitInteger(bitsPerValue); - } - return currentValue; - } catch (IOException e) { - throw new ParquetDecodingException("could not read int", e); - } - } - - // This forces it to deserialize into memory. If it wanted - // to, it could just read the bytes (though that number of - // bytes would have to be serialized). This is the flip-side - // to BoundedIntColumnWriter.writeData(BytesOutput) - @Override - public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException { - if (DEBUG) LOG.debug("reading size at "+ offset + ": " + in.get(offset) + " " + in.get(offset + 1) + " " + in.get(offset + 2) + " " + in.get(offset + 3) + " "); - int totalBytes = BytesUtils.readIntLittleEndian(in, offset); - if (DEBUG) LOG.debug("will read "+ totalBytes + " bytes"); - currentValueCt = 0; - currentValue = 0; - bitReader.prepare(in, offset + 4, totalBytes); - if (DEBUG) LOG.debug("will read next from " + (offset + totalBytes + 4)); - this.nextOffset = offset + totalBytes + 4; - } - - @Override - public int getNextOffset() { - return this.nextOffset; - } - - @Override - public void skip() { - readInteger(); - } -} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java deleted file mode 100644 index a90a6e5fcf..0000000000 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.column.values.boundedint; - -import org.apache.parquet.bytes.ByteBufferAllocator; -import static org.apache.parquet.bytes.BytesInput.concat; -import static org.apache.parquet.column.Encoding.RLE; -import org.apache.parquet.Log; -import org.apache.parquet.bytes.BytesInput; -import org.apache.parquet.column.Encoding; -import org.apache.parquet.column.values.ValuesWriter; -import org.apache.parquet.column.values.bitpacking.BitPackingValuesWriter; -import org.apache.parquet.io.ParquetEncodingException; - -/** - * This is a special ColumnWriter for the case when you need to write - * integers in a known range. This is intended primarily for use with - * repetition and definition levels, since the maximum value that will - * be written is known a priori based on the schema. Assumption is that - * the values written are between 0 and the bound, inclusive. - * - * This differs from {@link BitPackingValuesWriter} in that this also performs - * run-length encoding of the data, so is useful when long runs of repeated - * values are expected. - */ -class BoundedIntValuesWriter extends ValuesWriter { - private static final Log LOG = Log.getLog(BoundedIntValuesWriter.class); - - private int currentValue = -1; - private int currentValueCt = -1; - private boolean currentValueIsRepeated = false; - private boolean thereIsABufferedValue = false; - private int shouldRepeatThreshold = 0; - private int bitsPerValue; - private BitWriter bitWriter; - private boolean isFirst = true; - - private static final int[] byteToTrueMask = new int[8]; - static { - int currentMask = 1; - for (int i = 0; i < byteToTrueMask.length; i++) { - byteToTrueMask[i] = currentMask; - currentMask <<= 1; - } - } - - public BoundedIntValuesWriter(int bound, int initialCapacity, int pageSize, ByteBufferAllocator allocator) { - if (bound == 0) { - throw new ParquetEncodingException("Value bound cannot be 0. Use DevNullColumnWriter instead."); - } - this.bitWriter = new BitWriter(initialCapacity, pageSize, allocator); - bitsPerValue = (int)Math.ceil(Math.log(bound + 1)/Math.log(2)); - shouldRepeatThreshold = (bitsPerValue + 9)/(1 + bitsPerValue); - if (Log.DEBUG) LOG.debug("init column with bit width of " + bitsPerValue + " and repeat threshold of " + shouldRepeatThreshold); - } - - @Override - public long getBufferedSize() { - // currentValue + currentValueCt = 8 bytes - // shouldRepeatThreshold + bitsPerValue = 8 bytes - // bitWriter = 8 bytes - // currentValueIsRepeated + isFirst = 2 bytes (rounded to 8 b/c of word boundaries) - return 32 + (bitWriter == null ? 0 : bitWriter.getMemSize()); - } - - // This assumes that the full state must be serialized, since there is no close method - @Override - public BytesInput getBytes() { - serializeCurrentValue(); - BytesInput buf = bitWriter.finish(); - if (Log.DEBUG) LOG.debug("writing a buffer of size " + buf.size() + " + 4 bytes"); - // We serialize the length so that on deserialization we can - // deserialize as we go, instead of having to load everything - // into memory - return concat(BytesInput.fromInt((int)buf.size()), buf); - } - - @Override - public void reset() { - currentValue = -1; - currentValueCt = -1; - currentValueIsRepeated = false; - thereIsABufferedValue = false; - isFirst = true; - bitWriter.reset(); - } - - @Override - public void close() { - bitWriter.close(); - } - - @Override - public void writeInteger(int val) { - if (currentValue == val) { - currentValueCt++; - if (!currentValueIsRepeated && currentValueCt >= shouldRepeatThreshold) { - currentValueIsRepeated = true; - } - } else { - if (!isFirst) { - serializeCurrentValue(); - } else { - isFirst = false; - } - - newCurrentValue(val); - } - } - - private void serializeCurrentValue() { - if (thereIsABufferedValue) { - if (currentValueIsRepeated) { - bitWriter.writeBit(true); - bitWriter.writeNBitInteger(currentValue, bitsPerValue); - bitWriter.writeUnsignedVarint(currentValueCt); - } else { - for (int i = 0; i < currentValueCt; i++) { - bitWriter.writeBit(false); - bitWriter.writeNBitInteger(currentValue, bitsPerValue); - } - } - } - thereIsABufferedValue = false; - } - - private void newCurrentValue(int val) { - currentValue = val; - currentValueCt = 1; - currentValueIsRepeated = false; - thereIsABufferedValue = true; - } - - @Override - public long getAllocatedSize() { - return bitWriter.getCapacity(); - } - - @Override - public Encoding getEncoding() { - return RLE; - } - - @Override - public String memUsageString(String prefix) { - return bitWriter.memUsageString(prefix); - } - -} diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java similarity index 96% rename from parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java rename to parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java index 8c78c388a7..f8ff8d0d98 100644 --- a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java +++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java @@ -16,7 +16,7 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.parquet.column.values.boundedint; +package org.apache.parquet.column.values.rle; import java.io.IOException; import java.nio.ByteBuffer; diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java b/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java deleted file mode 100644 index d1e43d283c..0000000000 --- a/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java +++ /dev/null @@ -1,175 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.parquet.column.values.boundedint; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.ByteArrayOutputStream; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.Arrays; -import java.util.Random; - -import org.junit.Test; - -import org.apache.parquet.bytes.DirectByteBufferAllocator; - -public class TestBoundedColumns { - private final Random r = new Random(42L); - - @Test - public void testWriterRepeatNoRepeatAndRepeatUnderThreshold() throws IOException { - int[] ints = { - 1, 1, 1, 1, - 0, - 0, - 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, 2, // 16 2s - 1, - 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5 // 24 5s - }; - String[] result = {"1",b(1,3),b(4),"0",b(0,3),"0",b(0,3),"1",b(2,3),b(16),"0",b(1,3),"1",b(5,3),b(24)}; - compareOutput(7, ints, result); - } - - @Test - public void testWriterNoRepeat() throws IOException { - int bound = 7; - int[] ints = { 0, 1, 2, 3, 4, 5, 6, 7}; - String[] result = {"0",b(0,3),"0",b(1,3),"0",b(2,3),"0",b(3,3),"0",b(4,3),"0",b(5,3),"0",b(6,3),"0",b(7,3)}; - compareOutput(bound, ints, result); - } - - private void compareOutput(int bound, int[] ints, String[] result) throws IOException { - BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64*1024, 64*1024, new DirectByteBufferAllocator()); - for (int i : ints) { - bicw.writeInteger(i); - } - System.out.println(Arrays.toString(ints)); - System.out.println(Arrays.toString(result)); - byte[] byteArray = bicw.getBytes().toByteArray(); - assertEquals(concat(result), toBinaryString(byteArray, 4)); - BoundedIntValuesReader bicr = new BoundedIntValuesReader(bound); - bicr.initFromPage(1, ByteBuffer.wrap(byteArray), 0); - String expected = ""; - String got = ""; - for (int i : ints) { - expected += " " + i; - got += " " + bicr.readInteger(); - } - assertEquals(expected, got); - } - - private String concat(String[] result) { - String r = ""; - for (String string : result) { - r = string + r; - } - return r; - } - - private String b(int i) { - return b(i,8); - } - - private String b(int i, int size) { - String binaryString = Integer.toBinaryString(i); - while (binaryString.length() < size) { - binaryString = "0" + binaryString; - } - return binaryString; - } - - public static String toBinaryString(byte[] bytes) { - return toBinaryString(bytes, 0); - } - - private static String toBinaryString(byte[] bytes, int offset) { - String result = ""; - for (int i = offset; i < bytes.length; i++) { - int b = bytes[i] < 0 ? 256 + bytes[i] : bytes[i]; - String binaryString = Integer.toBinaryString(b); - while (binaryString.length() < 8) { - binaryString = "0" + binaryString; - } - result = binaryString + result; - } - return result; - } - - @Test - public void testSerDe() throws Exception { - int[] valuesPerStripe = new int[] { 50, 100, 700, 1, 200 }; - int totalValuesInStream = 0; - for (int v : valuesPerStripe) { - totalValuesInStream += v * 2; - } - - for (int bound = 1; bound < 8; bound++) { - System.out.println("bound: "+ bound); - ByteArrayOutputStream tmp = new ByteArrayOutputStream(); - - int[] stream = new int[totalValuesInStream]; - BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64 * 1024, 64*1024, new DirectByteBufferAllocator()); - int idx = 0; - for (int stripeNum = 0; stripeNum < valuesPerStripe.length; stripeNum++) { - int next = 0; - for (int i = 0; i < valuesPerStripe[stripeNum]; i++) { - int temp = r.nextInt(bound + 1); - while (next == temp) { - temp = r.nextInt(bound + 1); - } - next = temp; - stream[idx++] = next; - int ct; - if (r.nextBoolean()) { - stream[idx++] = ct = r.nextInt(1000) + 1; - } else { - stream[idx++] = ct = 1; - } - for (int j = 0; j < ct; j++) { - bicw.writeInteger(next); - } - } - bicw.getBytes().writeAllTo(tmp); - bicw.reset(); - } - tmp.close(); - - byte[] input = tmp.toByteArray(); - - BoundedIntValuesReader bicr = new BoundedIntValuesReader(bound); - idx = 0; - int offset = 0; - for (int stripeNum = 0; stripeNum < valuesPerStripe.length; stripeNum++) { - bicr.initFromPage(1, ByteBuffer.wrap(input), offset); - offset = bicr.getNextOffset(); - for (int i = 0; i < valuesPerStripe[stripeNum]; i++) { - int number = stream[idx++]; - int ct = stream[idx++]; - assertTrue(number <= bound); - assertTrue(ct > 0); - for (int j = 0; j < ct; j++) { - assertEquals("Failed on bound ["+bound+"], stripe ["+stripeNum+"], iteration ["+i+"], on count ["+ct+"]", number, bicr.readInteger()); - } - } - } - } - } -}