Skip to content

Commit

Permalink
Fix unsorted values.
Browse files Browse the repository at this point in the history
  • Loading branch information
rdblue committed Dec 20, 2024
1 parent 7312f19 commit cfb7304
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,12 @@
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.iceberg.relocated.com.google.common.annotations.VisibleForTesting;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.util.Pair;

class SerializedObject extends Variants.SerializedValue implements VariantObject {
Expand Down Expand Up @@ -52,6 +56,8 @@ static SerializedObject from(SerializedMetadata metadata, ByteBuffer value, int
private final Integer[] fieldIds;
private final int offsetSize;
private final int offsetListOffset;
private final int[] offsets;
private final int[] lengths;
private final int dataOffset;
private final VariantValue[] values;

Expand All @@ -66,8 +72,44 @@ private SerializedObject(SerializedMetadata metadata, ByteBuffer value, int head
this.fieldIdListOffset = Variants.HEADER_SIZE + numElementsSize;
this.fieldIds = new Integer[numElements];
this.offsetListOffset = fieldIdListOffset + (numElements * fieldIdSize);
this.offsets = new int[numElements];
this.lengths = new int[numElements];
this.dataOffset = offsetListOffset + ((1 + numElements) * offsetSize);
this.values = new VariantValue[numElements];

if (numElements > 0) {
initOffsetsAndLengths(numElements);
}
}

private void initOffsetsAndLengths(int numElements) {
// populate offsets list
Map<Integer, Integer> offsetToLength = Maps.newHashMap();
for (int index = 0; index < numElements; index += 1) {
offsets[index] =
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + (index * offsetSize), offsetSize);

offsetToLength.put(offsets[index], 0);
}

int dataLength =
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + (numElements * offsetSize), offsetSize);
offsetToLength.put(dataLength, 0);

// populate lengths list by sorting offsets
List<Integer> sortedOffsets =
offsetToLength.keySet().stream().sorted().collect(Collectors.toList());
for (int index = 0; index < numElements; index += 1) {
int offset = sortedOffsets.get(index);
int length = sortedOffsets.get(index + 1) - offset;
offsetToLength.put(offset, length);
}

for (int index = 0; index < lengths.length; index += 1) {
lengths[index] = offsetToLength.get(offsets[index]);
}
}

@VisibleForTesting
Expand Down Expand Up @@ -123,6 +165,7 @@ private int id(int index) {
VariantUtil.readLittleEndianUnsigned(
value, fieldIdListOffset + (index * fieldIdSize), fieldIdSize);
}

return fieldIds[index];
}

Expand All @@ -136,14 +179,9 @@ public VariantValue get(String name) {
}

if (null == values[index]) {
int offset =
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + (index * offsetSize), offsetSize);
int next =
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + ((1 + index) * offsetSize), offsetSize);
values[index] =
Variants.from(metadata, VariantUtil.slice(value, dataOffset + offset, next - offset));
Variants.from(
metadata, VariantUtil.slice(value, dataOffset + offsets[index], lengths[index]));
}

return values[index];
Expand Down Expand Up @@ -176,14 +214,7 @@ ByteBuffer sliceValue(int index) {
return ((Variants.Serialized) values[index]).buffer();
}

int offset =
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + (index * offsetSize), offsetSize);
int next =
VariantUtil.readLittleEndianUnsigned(
value, offsetListOffset + ((1 + index) * offsetSize), offsetSize);

return VariantUtil.slice(value, dataOffset + offset, next - offset);
return VariantUtil.slice(value, dataOffset + offsets[index], lengths[index]);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,24 @@ public class TestSerializedObject {
private static final SerializedPrimitive TRUE = SerializedPrimitive.from(new byte[] {0b100});
private static final SerializedPrimitive DATE =
SerializedPrimitive.from(new byte[] {0b101100, (byte) 0xF4, 0x43, 0x00, 0x00});
private static final byte[] UNSORTED_VALUES =
new byte[] {
0b10,
0x03, // 3 item object
0x00,
0x01,
0x02, // ascending key IDs (a, b, c)
0x02,
0x04,
0x00,
0x06, // values at offsets (2, 4, 0)
0b1100,
0x03, // c = 3 (int8)
0b1100,
0x01, // a = 1 (int8)
0b1100,
0x02 // b = 2 (int8)
};

private final Random random = new Random(198725);

Expand Down Expand Up @@ -86,6 +104,27 @@ public void testSimpleObject() {
assertThat(object.get("d")).isEqualTo(null);
}

@Test
public void testUnsortedValues() {
ByteBuffer meta =
VariantTestUtil.createMetadata(Sets.newHashSet("a", "b", "c"), true /* sort names */);

SerializedMetadata metadata = SerializedMetadata.from(meta);
SerializedObject object = SerializedObject.from(metadata, UNSORTED_VALUES);

assertThat(object.type()).isEqualTo(PhysicalType.OBJECT);
assertThat(object.numElements()).isEqualTo(3);

assertThat(object.get("a").type()).isEqualTo(PhysicalType.INT8);
assertThat(object.get("a").asPrimitive().get()).isEqualTo((byte) 1);
assertThat(object.get("b").type()).isEqualTo(PhysicalType.INT8);
assertThat(object.get("b").asPrimitive().get()).isEqualTo((byte) 2);
assertThat(object.get("c").type()).isEqualTo(PhysicalType.INT8);
assertThat(object.get("c").asPrimitive().get()).isEqualTo((byte) 3);

assertThat(object.get("d")).isEqualTo(null);
}

@Test
public void testOutOfOrderKeys() {
Map<String, VariantValue> data = ImmutableMap.of("b", I2, "a", I1, "c", I3);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -395,15 +395,17 @@ public void testThreeByteFieldIds(boolean sortFieldNames) {
assertThat(object.get("ZZ").asPrimitive().get()).isEqualTo(new BigDecimal("12.21"));
}

private static VariantValue roundTripMinimalBuffer(ShreddedObject object, SerializedMetadata metadata) {
private static VariantValue roundTripMinimalBuffer(
ShreddedObject object, SerializedMetadata metadata) {
ByteBuffer serialized =
ByteBuffer.allocate(object.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
object.writeTo(serialized, 0);

return Variants.from(metadata, serialized);
}

private static VariantValue roundTripLargeBuffer(ShreddedObject object, SerializedMetadata metadata) {
private static VariantValue roundTripLargeBuffer(
ShreddedObject object, SerializedMetadata metadata) {
ByteBuffer serialized =
ByteBuffer.allocate(1000 + object.sizeInBytes()).order(ByteOrder.LITTLE_ENDIAN);
object.writeTo(serialized, 300);
Expand Down

0 comments on commit cfb7304

Please sign in to comment.