Skip to content

Commit f213541

Browse files
Spark: Spark3 ZOrder Rewrite Strategy (#3983)
Adds initial implementation of a ZOrder File Rewrite Strategy. Allows uses to use a multidimensional sort algorithm for organizing data.
1 parent 0b48f51 commit f213541

File tree

13 files changed

+1757
-1
lines changed

13 files changed

+1757
-1
lines changed

api/src/main/java/org/apache/iceberg/actions/RewriteDataFiles.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,15 @@ default RewriteDataFiles sort(SortOrder sortOrder) {
129129
throw new UnsupportedOperationException("SORT Rewrite Strategy not implemented for this framework");
130130
}
131131

132+
/**
133+
* Choose Z-ORDER as a strategy for this rewrite operation with a specified list of columns to use
134+
* @param columns Columns to be used to generate Z-Values
135+
* @return this for method chaining
136+
*/
137+
default RewriteDataFiles zOrder(String... columns) {
138+
throw new UnsupportedOperationException("Z-ORDER Rewrite Strategy not implemented for this framework");
139+
}
140+
132141
/**
133142
* A user provided filter for determining which files will be considered by the rewrite strategy. This will be used
134143
* in addition to whatever rules the rewrite strategy generates. For example this would be used for providing a

api/src/main/java/org/apache/iceberg/util/ByteBuffers.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.nio.ByteBuffer;
2323
import java.util.Arrays;
24+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
2425

2526
public class ByteBuffers {
2627

@@ -46,6 +47,16 @@ public static byte[] toByteArray(ByteBuffer buffer) {
4647
}
4748
}
4849

50+
public static ByteBuffer reuse(ByteBuffer reuse, int length) {
51+
Preconditions.checkArgument(reuse.hasArray(), "Cannot reuse a buffer not backed by an array");
52+
Preconditions.checkArgument(reuse.arrayOffset() == 0, "Cannot reuse a buffer whose array offset is not 0");
53+
Preconditions.checkArgument(reuse.capacity() == length,
54+
"Canout use a buffer whose capacity (%s) is not equal to the requested length (%s)", length, reuse.capacity());
55+
reuse.position(0);
56+
reuse.limit(length);
57+
return reuse;
58+
}
59+
4960
public static ByteBuffer copy(ByteBuffer buffer) {
5061
if (buffer == null) {
5162
return null;
Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
21+
package org.apache.iceberg.util;
22+
23+
import java.nio.ByteBuffer;
24+
import java.util.Random;
25+
import java.util.concurrent.TimeUnit;
26+
import org.openjdk.jmh.annotations.Benchmark;
27+
import org.openjdk.jmh.annotations.BenchmarkMode;
28+
import org.openjdk.jmh.annotations.Fork;
29+
import org.openjdk.jmh.annotations.Measurement;
30+
import org.openjdk.jmh.annotations.Mode;
31+
import org.openjdk.jmh.annotations.Scope;
32+
import org.openjdk.jmh.annotations.Setup;
33+
import org.openjdk.jmh.annotations.State;
34+
import org.openjdk.jmh.annotations.Threads;
35+
import org.openjdk.jmh.annotations.Timeout;
36+
import org.openjdk.jmh.infra.Blackhole;
37+
38+
@Fork(1)
39+
@State(Scope.Benchmark)
40+
@Measurement(iterations = 5)
41+
@BenchmarkMode(Mode.SingleShotTime)
42+
@Timeout(time = 1000, timeUnit = TimeUnit.HOURS)
43+
public class ZOrderByteUtilsBenchmark {
44+
45+
private static final int NUM_ENTRIES = 10000000;
46+
47+
private byte[][][] fourColumnInput;
48+
private byte[][][] threeColumnInput;
49+
private byte[][][] twoColumnInput;
50+
51+
@Setup
52+
public void setupBench() {
53+
Random rand = new Random(42);
54+
fourColumnInput = new byte[NUM_ENTRIES][][];
55+
threeColumnInput = new byte[NUM_ENTRIES][][];
56+
twoColumnInput = new byte[NUM_ENTRIES][][];
57+
for (int i = 0; i < NUM_ENTRIES; i++) {
58+
fourColumnInput[i] = new byte[4][];
59+
threeColumnInput[i] = new byte[3][];
60+
twoColumnInput[i] = new byte[2][];
61+
for (int j = 0; j < 4; j++) {
62+
byte[] value = ByteBuffer.allocate(Long.BYTES).putLong(rand.nextLong()).array();
63+
if (j < 2) {
64+
twoColumnInput[i][j] = value;
65+
}
66+
if (j < 3) {
67+
threeColumnInput[i][j] = value;
68+
}
69+
fourColumnInput[i][j] = value;
70+
}
71+
}
72+
}
73+
74+
@Benchmark
75+
@Threads(1)
76+
public void interleaveValuesFourColumns(Blackhole blackhole) {
77+
int outputSize = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE * 4;
78+
ByteBuffer outputBuffer = ByteBuffer.allocate(outputSize);
79+
80+
for (int i = 0; i < fourColumnInput.length; i++) {
81+
byte[] interleavedBytes = ZOrderByteUtils.interleaveBits(fourColumnInput[i], outputSize, outputBuffer);
82+
blackhole.consume(interleavedBytes);
83+
}
84+
}
85+
86+
@Benchmark
87+
@Threads(1)
88+
public void interleaveValuesThreeColumns(Blackhole blackhole) {
89+
int outputSize = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE * 3;
90+
ByteBuffer outputBuffer = ByteBuffer.allocate(outputSize);
91+
92+
for (int i = 0; i < fourColumnInput.length; i++) {
93+
byte[] interleavedBytes = ZOrderByteUtils.interleaveBits(threeColumnInput[i], outputSize, outputBuffer);
94+
blackhole.consume(interleavedBytes);
95+
}
96+
}
97+
98+
@Benchmark
99+
@Threads(1)
100+
public void interleaveValuesTwoColumns(Blackhole blackhole) {
101+
int outputSize = ZOrderByteUtils.PRIMITIVE_BUFFER_SIZE * 2;
102+
ByteBuffer outputBuffer = ByteBuffer.allocate(outputSize);
103+
104+
for (int i = 0; i < fourColumnInput.length; i++) {
105+
byte[] interleavedBytes = ZOrderByteUtils.interleaveBits(twoColumnInput[i], outputSize, outputBuffer);
106+
blackhole.consume(interleavedBytes);
107+
}
108+
}
109+
110+
@Benchmark
111+
@Threads(1)
112+
public void interleaveValuesFourColumns8ByteOutput(Blackhole blackhole) {
113+
int outputSize = 8;
114+
ByteBuffer outputBuffer = ByteBuffer.allocate(outputSize);
115+
116+
for (int i = 0; i < fourColumnInput.length; i++) {
117+
byte[] interleavedBytes = ZOrderByteUtils.interleaveBits(fourColumnInput[i], outputSize, outputBuffer);
118+
blackhole.consume(interleavedBytes);
119+
}
120+
}
121+
}
Lines changed: 215 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,215 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iceberg.util;
21+
22+
import java.nio.ByteBuffer;
23+
import java.nio.CharBuffer;
24+
import java.nio.charset.CharsetEncoder;
25+
import java.nio.charset.StandardCharsets;
26+
import java.util.Arrays;
27+
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
28+
29+
/**
30+
* Within Z-Ordering the byte representations of objects being compared must be ordered,
31+
* this requires several types to be transformed when converted to bytes. The goal is to
32+
* map object's whose byte representation are not lexicographically ordered into representations
33+
* that are lexicographically ordered. Bytes produced should be compared lexicographically as
34+
* unsigned bytes, big-endian.
35+
* <p>
36+
* All types except for String are stored within an 8 Byte Buffer
37+
* <p>
38+
* Most of these techniques are derived from
39+
* https://aws.amazon.com/blogs/database/z-order-indexing-for-multifaceted-queries-in-amazon-dynamodb-part-2/
40+
* <p>
41+
* Some implementation is taken from
42+
* https://github.com/apache/hbase/blob/master/hbase-common/src/main/java/org/apache/hadoop/hbase/util/OrderedBytes.java
43+
*/
44+
public class ZOrderByteUtils {
45+
46+
public static final int PRIMITIVE_BUFFER_SIZE = 8;
47+
48+
private ZOrderByteUtils() {
49+
}
50+
51+
static ByteBuffer allocatePrimitiveBuffer() {
52+
return ByteBuffer.allocate(PRIMITIVE_BUFFER_SIZE);
53+
}
54+
55+
/**
56+
* Signed ints do not have their bytes in magnitude order because of the sign bit.
57+
* To fix this, flip the sign bit so that all negatives are ordered before positives. This essentially
58+
* shifts the 0 value so that we don't break our ordering when we cross the new 0 value.
59+
*/
60+
public static ByteBuffer intToOrderedBytes(int val, ByteBuffer reuse) {
61+
ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE);
62+
bytes.putLong(((long) val) ^ 0x8000000000000000L);
63+
return bytes;
64+
}
65+
66+
/**
67+
* Signed longs are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)}
68+
*/
69+
public static ByteBuffer longToOrderedBytes(long val, ByteBuffer reuse) {
70+
ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE);
71+
bytes.putLong(val ^ 0x8000000000000000L);
72+
return bytes;
73+
}
74+
75+
/**
76+
* Signed shorts are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)}
77+
*/
78+
public static ByteBuffer shortToOrderedBytes(short val, ByteBuffer reuse) {
79+
ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE);
80+
bytes.putLong(((long) val) ^ 0x8000000000000000L);
81+
return bytes;
82+
}
83+
84+
/**
85+
* Signed tiny ints are treated the same as the signed ints in {@link #intToOrderedBytes(int, ByteBuffer)}
86+
*/
87+
public static ByteBuffer tinyintToOrderedBytes(byte val, ByteBuffer reuse) {
88+
ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE);
89+
bytes.putLong(((long) val) ^ 0x8000000000000000L);
90+
return bytes;
91+
}
92+
93+
/**
94+
* IEEE 754 :
95+
* “If two floating-point numbers in the same format are ordered (say, x {@literal <} y),
96+
* they are ordered the same way when their bits are reinterpreted as sign-magnitude integers.”
97+
*
98+
* Which means floats can be treated as sign magnitude integers which can then be converted into lexicographically
99+
* comparable bytes
100+
*/
101+
public static ByteBuffer floatToOrderedBytes(float val, ByteBuffer reuse) {
102+
ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE);
103+
long lval = Double.doubleToLongBits(val);
104+
lval ^= ((lval >> (Integer.SIZE - 1)) | Long.MIN_VALUE);
105+
bytes.putLong(lval);
106+
return bytes;
107+
}
108+
109+
/**
110+
* Doubles are treated the same as floats in {@link #floatToOrderedBytes(float, ByteBuffer)}
111+
*/
112+
public static ByteBuffer doubleToOrderedBytes(double val, ByteBuffer reuse) {
113+
ByteBuffer bytes = ByteBuffers.reuse(reuse, PRIMITIVE_BUFFER_SIZE);
114+
long lval = Double.doubleToLongBits(val);
115+
lval ^= ((lval >> (Integer.SIZE - 1)) | Long.MIN_VALUE);
116+
bytes.putLong(lval);
117+
return bytes;
118+
}
119+
120+
/**
121+
* Strings are lexicographically sortable BUT if different byte array lengths will
122+
* ruin the Z-Ordering. (ZOrder requires that a given column contribute the same number of bytes every time).
123+
* This implementation just uses a set size to for all output byte representations. Truncating longer strings
124+
* and right padding 0 for shorter strings.
125+
*/
126+
public static ByteBuffer stringToOrderedBytes(String val, int length, ByteBuffer reuse, CharsetEncoder encoder) {
127+
Preconditions.checkArgument(encoder.charset().equals(StandardCharsets.UTF_8),
128+
"Cannot use an encoder not using UTF_8 as it's Charset");
129+
130+
ByteBuffer bytes = ByteBuffers.reuse(reuse, length);
131+
Arrays.fill(bytes.array(), 0, length, (byte) 0x00);
132+
if (val != null) {
133+
CharBuffer inputBuffer = CharBuffer.wrap(val);
134+
encoder.encode(inputBuffer, bytes, true);
135+
}
136+
return bytes;
137+
}
138+
139+
/**
140+
* Return a bytebuffer with the given bytes truncated to length, or filled with 0's to length depending on whether
141+
* the given bytes are larger or smaller than the given length.
142+
*/
143+
public static ByteBuffer byteTruncateOrFill(byte[] val, int length, ByteBuffer reuse) {
144+
ByteBuffer bytes = ByteBuffers.reuse(reuse, length);
145+
if (val.length < length) {
146+
bytes.put(val, 0, val.length);
147+
Arrays.fill(bytes.array(), val.length, length, (byte) 0x00);
148+
} else {
149+
bytes.put(val, 0, length);
150+
}
151+
return bytes;
152+
}
153+
154+
static byte[] interleaveBits(byte[][] columnsBinary, int interleavedSize) {
155+
return interleaveBits(columnsBinary, interleavedSize, ByteBuffer.allocate(interleavedSize));
156+
}
157+
158+
/**
159+
* Interleave bits using a naive loop. Variable length inputs are allowed but to get a consistent ordering it is
160+
* required that every column contribute the same number of bytes in each invocation. Bits are interleaved from all
161+
* columns that have a bit available at that position. Once a Column has no more bits to produce it is skipped in the
162+
* interleaving.
163+
* @param columnsBinary an array of ordered byte representations of the columns being ZOrdered
164+
* @param interleavedSize the number of bytes to use in the output
165+
* @return the columnbytes interleaved
166+
*/
167+
public static byte[] interleaveBits(byte[][] columnsBinary, int interleavedSize, ByteBuffer reuse) {
168+
byte[] interleavedBytes = reuse.array();
169+
Arrays.fill(interleavedBytes, 0, interleavedSize, (byte) 0x00);
170+
171+
int sourceColumn = 0;
172+
int sourceByte = 0;
173+
int sourceBit = 7;
174+
int interleaveByte = 0;
175+
int interleaveBit = 7;
176+
177+
while (interleaveByte < interleavedSize) {
178+
// Take the source bit from source byte and move it to the output bit position
179+
interleavedBytes[interleaveByte] |=
180+
(columnsBinary[sourceColumn][sourceByte] & 1 << sourceBit) >>> sourceBit << interleaveBit;
181+
--interleaveBit;
182+
183+
// Check if an output byte has been completed
184+
if (interleaveBit == -1) {
185+
// Move to the next output byte
186+
interleaveByte++;
187+
// Move to the highest order bit of the new output byte
188+
interleaveBit = 7;
189+
}
190+
191+
// Check if the last output byte has been completed
192+
if (interleaveByte == interleavedSize) {
193+
break;
194+
}
195+
196+
// Find the next source bit to interleave
197+
do {
198+
// Move to next column
199+
++sourceColumn;
200+
if (sourceColumn == columnsBinary.length) {
201+
// If the last source column was used, reset to next bit of first column
202+
sourceColumn = 0;
203+
--sourceBit;
204+
if (sourceBit == -1) {
205+
// If the last bit of the source byte was used, reset to the highest bit of the next byte
206+
sourceByte++;
207+
sourceBit = 7;
208+
}
209+
}
210+
} while (columnsBinary[sourceColumn].length <= sourceByte);
211+
}
212+
return interleavedBytes;
213+
}
214+
215+
}

0 commit comments

Comments
 (0)