From 2cd92c344aabd6cb93a2c14122149f9e2ad1884c Mon Sep 17 00:00:00 2001 From: Alexey Ponkin Date: Mon, 14 Nov 2016 11:06:08 +0300 Subject: [PATCH 1/3] SPARK-18252: Using RoaringBitmap for bloom filters --- common/sketch/pom.xml | 4 + .../apache/spark/util/sketch/BitArray.java | 116 --------------- .../apache/spark/util/sketch/BloomFilter.java | 1 + .../spark/util/sketch/BloomFilterImpl.java | 84 +++++------ .../apache/spark/util/sketch/Murmur3_128.java | 135 +++++++++++++++++ .../spark/util/sketch/RoaringBitmapArray.java | 136 ++++++++++++++++++ .../spark/util/sketch/Murmur3_128Suite.java | 30 ++++ .../spark/util/sketch/BloomFilterSuite.scala | 37 ++++- ...te.scala => RoaringBitmapArraySuite.scala} | 37 ++--- 9 files changed, 396 insertions(+), 184 deletions(-) delete mode 100644 common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java create mode 100644 common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_128.java create mode 100644 common/sketch/src/main/java/org/apache/spark/util/sketch/RoaringBitmapArray.java create mode 100644 common/sketch/src/test/java/org/apache/spark/util/sketch/Murmur3_128Suite.java rename common/sketch/src/test/scala/org/apache/spark/util/sketch/{BitArraySuite.scala => RoaringBitmapArraySuite.scala} (73%) diff --git a/common/sketch/pom.xml b/common/sketch/pom.xml index 626f023a5b99..ec0bd020a1ed 100644 --- a/common/sketch/pom.xml +++ b/common/sketch/pom.xml @@ -39,6 +39,10 @@ org.apache.spark spark-tags_${scala.binary.version} + + org.roaringbitmap + RoaringBitmap + diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java deleted file mode 100644 index 480a0a79db32..000000000000 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BitArray.java +++ /dev/null @@ -1,116 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.spark.util.sketch; - -import java.io.DataInputStream; -import java.io.DataOutputStream; -import java.io.IOException; -import java.util.Arrays; - -final class BitArray { - private final long[] data; - private long bitCount; - - static int numWords(long numBits) { - if (numBits <= 0) { - throw new IllegalArgumentException("numBits must be positive, but got " + numBits); - } - long numWords = (long) Math.ceil(numBits / 64.0); - if (numWords > Integer.MAX_VALUE) { - throw new IllegalArgumentException("Can't allocate enough space for " + numBits + " bits"); - } - return (int) numWords; - } - - BitArray(long numBits) { - this(new long[numWords(numBits)]); - } - - private BitArray(long[] data) { - this.data = data; - long bitCount = 0; - for (long word : data) { - bitCount += Long.bitCount(word); - } - this.bitCount = bitCount; - } - - /** Returns true if the bit changed value. */ - boolean set(long index) { - if (!get(index)) { - data[(int) (index >>> 6)] |= (1L << index); - bitCount++; - return true; - } - return false; - } - - boolean get(long index) { - return (data[(int) (index >>> 6)] & (1L << index)) != 0; - } - - /** Number of bits */ - long bitSize() { - return (long) data.length * Long.SIZE; - } - - /** Number of set bits (1s) */ - long cardinality() { - return bitCount; - } - - /** Combines the two BitArrays using bitwise OR. */ - void putAll(BitArray array) { - assert data.length == array.data.length : "BitArrays must be of equal length when merging"; - long bitCount = 0; - for (int i = 0; i < data.length; i++) { - data[i] |= array.data[i]; - bitCount += Long.bitCount(data[i]); - } - this.bitCount = bitCount; - } - - void writeTo(DataOutputStream out) throws IOException { - out.writeInt(data.length); - for (long datum : data) { - out.writeLong(datum); - } - } - - static BitArray readFrom(DataInputStream in) throws IOException { - int numWords = in.readInt(); - long[] data = new long[numWords]; - for (int i = 0; i < numWords; i++) { - data[i] = in.readLong(); - } - return new BitArray(data); - } - - @Override - public boolean equals(Object other) { - if (this == other) return true; - if (other == null || !(other instanceof BitArray)) return false; - BitArray that = (BitArray) other; - return Arrays.equals(data, that.data); - } - - @Override - public int hashCode() { - return Arrays.hashCode(data); - } -} diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java index c0b425e72959..b7b01ae75586 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java @@ -233,4 +233,5 @@ public static BloomFilter create(long expectedNumItems, long numBits) { return new BloomFilterImpl(optimalNumOfHashFunctions(expectedNumItems, numBits), numBits); } + } diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java index 92c28bcb56a5..1119658bf225 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java @@ -23,13 +23,13 @@ class BloomFilterImpl extends BloomFilter implements Serializable { private int numHashFunctions; - private BitArray bits; + private RoaringBitmapArray bits; BloomFilterImpl(int numHashFunctions, long numBits) { - this(new BitArray(numBits), numHashFunctions); + this(new RoaringBitmapArray(numBits), numHashFunctions); } - private BloomFilterImpl(BitArray bits, int numHashFunctions) { + private BloomFilterImpl(RoaringBitmapArray bits, int numHashFunctions) { this.bits = bits; this.numHashFunctions = numHashFunctions; } @@ -48,7 +48,7 @@ public boolean equals(Object other) { BloomFilterImpl that = (BloomFilterImpl) other; - return this.numHashFunctions == that.numHashFunctions && this.bits.equals(that.bits); + return (this.numHashFunctions == that.numHashFunctions) && this.bits.equals(that.bits); } @Override @@ -84,18 +84,19 @@ public boolean putString(String item) { @Override public boolean putBinary(byte[] item) { - int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0); - int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1); + // Strategy is taken from guava`s BloomFilterStrategies.MURMUR128_MITZ_64 + long[] hashes = new long[2]; + Murmur3_128.hashBytes(item, 0, hashes); + long h1 = hashes[0]; + long h2 = hashes[1]; long bitSize = bits.bitSize(); boolean bitsChanged = false; + long combinedHash = h1; for (int i = 1; i <= numHashFunctions; i++) { - int combinedHash = h1 + (i * h2); - // Flip all the bits if it's negative (guaranteed positive number) - if (combinedHash < 0) { - combinedHash = ~combinedHash; - } - bitsChanged |= bits.set(combinedHash % bitSize); + // Make combinedHash positive and indexable + bitsChanged |= bits.set((combinedHash & Long.MAX_VALUE) % bitSize); + combinedHash += h2; } return bitsChanged; } @@ -107,61 +108,59 @@ public boolean mightContainString(String item) { @Override public boolean mightContainBinary(byte[] item) { - int h1 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, 0); - int h2 = Murmur3_x86_32.hashUnsafeBytes(item, Platform.BYTE_ARRAY_OFFSET, item.length, h1); + // Strategy is taken from guava`s BloomFilterStrategies.MURMUR128_MITZ_64 + long[] hashes = new long[2]; + Murmur3_128.hashBytes(item, 0, hashes); + + long h1 = hashes[0]; + long h2 = hashes[1]; long bitSize = bits.bitSize(); + long combinedHash = h1; for (int i = 1; i <= numHashFunctions; i++) { - int combinedHash = h1 + (i * h2); - // Flip all the bits if it's negative (guaranteed positive number) - if (combinedHash < 0) { - combinedHash = ~combinedHash; - } - if (!bits.get(combinedHash % bitSize)) { + // Make combinedHash positive and indexable + if (!bits.get((combinedHash & Long.MAX_VALUE) % bitSize)) { return false; } + combinedHash += h2; } return true; } @Override public boolean putLong(long item) { - // Here we first hash the input long element into 2 int hash values, h1 and h2, then produce n - // hash values by `h1 + i * h2` with 1 <= i <= numHashFunctions. - // Note that `CountMinSketch` use a different strategy, it hash the input long element with - // every i to produce n hash values. - // TODO: the strategy of `CountMinSketch` looks more advanced, should we follow it here? - int h1 = Murmur3_x86_32.hashLong(item, 0); - int h2 = Murmur3_x86_32.hashLong(item, h1); + // Strategy is taken from guava`s BloomFilterStrategies.MURMUR128_MITZ_64 + long[] hashes = new long[2]; + Murmur3_128.hashLong(item, 0, hashes); + long h1 = hashes[0]; + long h2 = hashes[1]; long bitSize = bits.bitSize(); boolean bitsChanged = false; + long combinedHash = h1; for (int i = 1; i <= numHashFunctions; i++) { - int combinedHash = h1 + (i * h2); - // Flip all the bits if it's negative (guaranteed positive number) - if (combinedHash < 0) { - combinedHash = ~combinedHash; - } - bitsChanged |= bits.set(combinedHash % bitSize); + // Make combinedHash positive and indexable + bitsChanged |= bits.set((combinedHash & Long.MAX_VALUE) % bitSize); + combinedHash += h2; } return bitsChanged; } @Override public boolean mightContainLong(long item) { - int h1 = Murmur3_x86_32.hashLong(item, 0); - int h2 = Murmur3_x86_32.hashLong(item, h1); + // Strategy is taken from guava`s BloomFilterStrategies.MURMUR128_MITZ_64 + long[] hashes = new long[2]; + Murmur3_128.hashLong(item, 0, hashes); + long h1 = hashes[0]; + long h2 = hashes[1]; long bitSize = bits.bitSize(); + long combinedHash = h1; for (int i = 1; i <= numHashFunctions; i++) { - int combinedHash = h1 + (i * h2); - // Flip all the bits if it's negative (guaranteed positive number) - if (combinedHash < 0) { - combinedHash = ~combinedHash; - } - if (!bits.get(combinedHash % bitSize)) { + if (!bits.get((combinedHash & Long.MAX_VALUE) % bitSize)) { return false; } + combinedHash += h2; } return true; } @@ -238,7 +237,7 @@ private void readFrom0(InputStream in) throws IOException { } this.numHashFunctions = dis.readInt(); - this.bits = BitArray.readFrom(dis); + this.bits = RoaringBitmapArray.readFrom(dis); } public static BloomFilterImpl readFrom(InputStream in) throws IOException { @@ -254,4 +253,5 @@ private void writeObject(ObjectOutputStream out) throws IOException { private void readObject(ObjectInputStream in) throws IOException { readFrom0(in); } + } diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_128.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_128.java new file mode 100644 index 000000000000..31041b4a1436 --- /dev/null +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_128.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.sketch; + +/** + * 128-bit Murmur3 hasher. + * Best perfomance is on x86_64 platform + * Based on implementation https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp + */ +final class Murmur3_128 { + + private static final long C1 = 0x87c37b91114253d5L; + private static final long C2 = 0x4cf5ad432745937fL; + + static void hashBytes(byte[] data, long seed, long[] hashes) { + hash(data, Platform.BYTE_ARRAY_OFFSET, data.length, seed, hashes); + } + + static void hashLong(long data, long seed, long[] hashes) { + hash(new long[]{data}, Platform.LONG_ARRAY_OFFSET, 8, seed, hashes); // 8 - long`s size in bytes + } + + @SuppressWarnings("fallthrough") + private static void hash(Object key, int offset, int length, long seed, long[] result) { + long h1 = seed & 0x00000000FFFFFFFFL; + long h2 = seed & 0x00000000FFFFFFFFL; + + int roundedEnd = offset + (length & 0xFFFFFFF0); // round down to 16 byte block + for (int i=offset; i>> 33; + k *= 0xff51afd7ed558ccdL; + k ^= k >>> 33; + k *= 0xc4ceb9fe1a85ec53L; + k ^= k >>> 33; + return k; + } + + private static long mixK1(long k1) { + k1 *= C1; + k1 = Long.rotateLeft(k1,31); + k1 *= C2; + return k1; + } + + private static long mixK2(long k2) { + k2 *= C2; + k2 = Long.rotateLeft(k2,33); + k2 *= C1; + return k2; + } + +} \ No newline at end of file diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/RoaringBitmapArray.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/RoaringBitmapArray.java new file mode 100644 index 000000000000..be16d80edf2c --- /dev/null +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/RoaringBitmapArray.java @@ -0,0 +1,136 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.util.sketch; + +import org.roaringbitmap.RoaringBitmap; + +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.util.Arrays; + +class RoaringBitmapArray { + + private final RoaringBitmap[] data; + private long bitCount; // number of 1`s in bitset + private final long numBits; // total number of available bits + + private static int numOfBuckets(long numBits) { + if (numBits <= 0) { + throw new IllegalArgumentException("numBits must be positive, but got " + numBits); + } + return (int) Math.ceil(numBits / (double)Integer.MAX_VALUE); + } + + private static RoaringBitmap[] initialVector(int numOfBuckets){ + RoaringBitmap[] vector = new RoaringBitmap[numOfBuckets]; + for(int i = 0;i Boolean = _ % 2 == 0 + val even = allBytes.filter(isEven) + val odd = allBytes.filterNot(isEven) + // insert first `numInsertion` items. + even.foreach(filter.put) + + // false negative is not allowed. + assert(even.forall(filter.mightContain)) + + // The number of inserted items doesn't exceed `expectedNumItems`, so the `expectedFpp` + // should not be significantly higher than the one we passed in to create this bloom filter. + assert(filter.expectedFpp() - fpp < EPSILON) + + val errorCount = odd.count(filter.mightContain) + // Also check the actual fpp is not significantly higher than we expected. + val actualFpp = errorCount.toDouble / odd.length + assert(actualFpp - fpp < EPSILON) + + checkSerDe(filter) + } + } diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/RoaringBitmapArraySuite.scala similarity index 73% rename from common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala rename to common/sketch/src/test/scala/org/apache/spark/util/sketch/RoaringBitmapArraySuite.scala index ff728f0ebcb8..16ded6af3a87 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BitArraySuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/RoaringBitmapArraySuite.scala @@ -21,34 +21,17 @@ import scala.util.Random import org.scalatest.FunSuite // scalastyle:ignore funsuite -class BitArraySuite extends FunSuite { // scalastyle:ignore funsuite +class RoaringBitmapArraySuite extends FunSuite { // scalastyle:ignore funsuite - test("error case when create BitArray") { - intercept[IllegalArgumentException](new BitArray(0)) - intercept[IllegalArgumentException](new BitArray(64L * Integer.MAX_VALUE + 1)) - } - - test("bitSize") { - assert(new BitArray(64).bitSize() == 64) - // BitArray is word-aligned, so 65~128 bits need 2 long to store, which is 128 bits. - assert(new BitArray(65).bitSize() == 128) - assert(new BitArray(127).bitSize() == 128) - assert(new BitArray(128).bitSize() == 128) - } - - test("set") { - val bitArray = new BitArray(64) - assert(bitArray.set(1)) - // Only returns true if the bit changed. - assert(!bitArray.set(1)) - assert(bitArray.set(2)) + test("error case when create RoaringBitmapArray") { + intercept[IllegalArgumentException](new RoaringBitmapArray(0)) } test("normal operation") { // use a fixed seed to make the test predictable. val r = new Random(37) - val bitArray = new BitArray(320) + val bitArray = new RoaringBitmapArray(320) val indexes = (1 to 100).map(_ => r.nextInt(320).toLong).distinct indexes.foreach(bitArray.set) @@ -56,12 +39,20 @@ class BitArraySuite extends FunSuite { // scalastyle:ignore funsuite assert(bitArray.cardinality() == indexes.length) } + test("set") { + val bitArray = new RoaringBitmapArray(64) + assert(bitArray.set(1)) + // Only returns true if the bit changed. + assert(!bitArray.set(1)) + assert(bitArray.set(2)) + } + test("merge") { // use a fixed seed to make the test predictable. val r = new Random(37) - val bitArray1 = new BitArray(64 * 6) - val bitArray2 = new BitArray(64 * 6) + val bitArray1 = new RoaringBitmapArray(64 * 6) + val bitArray2 = new RoaringBitmapArray(64 * 6) val indexes1 = (1 to 100).map(_ => r.nextInt(64 * 6).toLong).distinct val indexes2 = (1 to 100).map(_ => r.nextInt(64 * 6).toLong).distinct From 66f58d11ecd9d16054619fec94f413fd3b62d9a0 Mon Sep 17 00:00:00 2001 From: Alexey Ponkin Date: Thu, 17 Nov 2016 23:32:44 +0300 Subject: [PATCH 2/3] SPARK-18252: Fixing review comments --- .../apache/spark/util/sketch/BloomFilter.java | 5 +- .../spark/util/sketch/BloomFilterImpl.java | 3 +- .../apache/spark/util/sketch/Murmur3_128.java | 219 ++++++++++-------- .../spark/util/sketch/RoaringBitmapArray.java | 212 +++++++++-------- .../spark/util/sketch/Murmur3_128Suite.java | 28 ++- .../spark/util/sketch/BloomFilterSuite.scala | 31 ++- .../util/sketch/RoaringBitmapArraySuite.scala | 2 +- 7 files changed, 293 insertions(+), 207 deletions(-) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java index b7b01ae75586..58dd996ab0e3 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java @@ -51,7 +51,7 @@ public enum Version { *
  • The words/longs (numWords * 64 bit)
  • * */ - V1(1); + V1(2); private final int versionNumber; @@ -233,5 +233,4 @@ public static BloomFilter create(long expectedNumItems, long numBits) { return new BloomFilterImpl(optimalNumOfHashFunctions(expectedNumItems, numBits), numBits); } - -} +} \ No newline at end of file diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java index 1119658bf225..daa6c866ab66 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java @@ -253,5 +253,4 @@ private void writeObject(ObjectOutputStream out) throws IOException { private void readObject(ObjectInputStream in) throws IOException { readFrom0(in); } - -} +} \ No newline at end of file diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_128.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_128.java index 31041b4a1436..0de3ecd71f91 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_128.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_128.java @@ -19,117 +19,132 @@ /** * 128-bit Murmur3 hasher. - * Best perfomance is on x86_64 platform - * Based on implementation https://github.com/aappleby/smhasher/blob/master/src/MurmurHash3.cpp + * Best performance is on x86_64 platform + * Based on implementation smhasher + * and SOLR implementation. */ final class Murmur3_128 { - private static final long C1 = 0x87c37b91114253d5L; - private static final long C2 = 0x4cf5ad432745937fL; + private static final long C1 = 0x87c37b91114253d5L; + private static final long C2 = 0x4cf5ad432745937fL; - static void hashBytes(byte[] data, long seed, long[] hashes) { - hash(data, Platform.BYTE_ARRAY_OFFSET, data.length, seed, hashes); - } + static void hashBytes(byte[] data, long seed, long[] hashes) { + hash(data, Platform.BYTE_ARRAY_OFFSET, data.length, seed, hashes); + } - static void hashLong(long data, long seed, long[] hashes) { - hash(new long[]{data}, Platform.LONG_ARRAY_OFFSET, 8, seed, hashes); // 8 - long`s size in bytes - } + static void hashLong(long data, long seed, long[] hashes) { + hash(new long[]{data}, Platform.LONG_ARRAY_OFFSET, 8, seed, hashes); // 8 - long`s size in bytes + } - @SuppressWarnings("fallthrough") - private static void hash(Object key, int offset, int length, long seed, long[] result) { - long h1 = seed & 0x00000000FFFFFFFFL; - long h2 = seed & 0x00000000FFFFFFFFL; - - int roundedEnd = offset + (length & 0xFFFFFFF0); // round down to 16 byte block - for (int i=offset; i>> 33; - k *= 0xff51afd7ed558ccdL; - k ^= k >>> 33; - k *= 0xc4ceb9fe1a85ec53L; - k ^= k >>> 33; - return k; - } + h1 ^= mixK1(k1); - private static long mixK1(long k1) { - k1 *= C1; - k1 = Long.rotateLeft(k1,31); - k1 *= C2; - return k1; - } + h1 = Long.rotateLeft(h1, 27); + h1 += h2; + h1 = h1 * 5 + 0x52dce729; - private static long mixK2(long k2) { - k2 *= C2; - k2 = Long.rotateLeft(k2,33); - k2 *= C1; - return k2; - } + h2 ^= mixK2(k2); + h2 = Long.rotateLeft(h2, 31); + h2 += h1; + h2 = h2 * 5 + 0x38495ab5; + } + long k1 = 0; + long k2 = 0; + + switch (length & 15) { + case 15: + k2 = (Platform.getByte(key, roundedEnd + 14) & 0xFFL) << 48; // fall through + case 14: + k2 |= (Platform.getByte(key, roundedEnd + 13) & 0xFFL) << 40; // fall through + case 13: + k2 |= (Platform.getByte(key, roundedEnd + 12) & 0xFFL) << 32; // fall through + case 12: + k2 |= (Platform.getByte(key, roundedEnd + 11) & 0xFFL) << 24; // fall through + case 11: + k2 |= (Platform.getByte(key, roundedEnd + 10) & 0xFFL) << 16; // fall through + case 10: + k2 |= (Platform.getByte(key, roundedEnd + 9) & 0xFFL) << 8; // fall through + case 9: + k2 |= (Platform.getByte(key, roundedEnd + 8) & 0xFFL); // fall through + h2 ^= mixK2(k2); + case 8: + k1 = ((long) Platform.getByte(key, roundedEnd + 7)) << 56; // fall through + case 7: + k1 |= (Platform.getByte(key, roundedEnd + 6) & 0xFFL) << 48; // fall through + case 6: + k1 |= (Platform.getByte(key, roundedEnd + 5) & 0xFFL) << 40; // fall through + case 5: + k1 |= (Platform.getByte(key, roundedEnd + 4) & 0xFFL) << 32; // fall through + case 4: + k1 |= (Platform.getByte(key, roundedEnd + 3) & 0xFFL) << 24; // fall through + case 3: + k1 |= (Platform.getByte(key, roundedEnd + 2) & 0xFFL) << 16; // fall through + case 2: + k1 |= (Platform.getByte(key, roundedEnd + 1) & 0xFFL) << 8; // fall through + case 1: + k1 |= (Platform.getByte(key, roundedEnd) & 0xFFL); + h1 ^= mixK1(k1); + } + h1 ^= length; + h2 ^= length; + + h1 += h2; + h2 += h1; + + h1 = fmix64(h1); + h2 = fmix64(h2); + + h1 += h2; + h2 += h1; + + result[0] = h1; + result[1] = h2; + } + + /** + * Gets a long from a byte buffer in little endian byte order. + */ + private static long getLongLittleEndian(Object key, int offset) { + return (Platform.getByte(key, offset) & 0xFFL) + | ((Platform.getByte(key, offset + 1) & 0xFFL) << 8) + | ((Platform.getByte(key, offset + 2) & 0xFFL) << 16) + | ((Platform.getByte(key, offset + 3) & 0xFFL) << 24) + | ((Platform.getByte(key, offset + 4) & 0xFFL) << 32) + | ((Platform.getByte(key, offset + 5) & 0xFFL) << 40) + | ((Platform.getByte(key, offset + 6) & 0xFFL) << 48) + | (((long) Platform.getByte(key, offset + 7)) << 56); + } + + private static long fmix64(long k) { + k ^= k >>> 33; + k *= 0xff51afd7ed558ccdL; + k ^= k >>> 33; + k *= 0xc4ceb9fe1a85ec53L; + k ^= k >>> 33; + return k; + } + + private static long mixK1(long k1) { + k1 *= C1; + k1 = Long.rotateLeft(k1, 31); + k1 *= C2; + return k1; + } + + private static long mixK2(long k2) { + k2 *= C2; + k2 = Long.rotateLeft(k2, 33); + k2 *= C1; + return k2; + } } \ No newline at end of file diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/RoaringBitmapArray.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/RoaringBitmapArray.java index be16d80edf2c..fa973ac48f21 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/RoaringBitmapArray.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/RoaringBitmapArray.java @@ -24,113 +24,137 @@ import java.io.IOException; import java.util.Arrays; +/** + * This class represents bit vector with + * {@link #set(long index) set} and {@link #get(long index) get} methods. + * It is memory efficient and faster in operations {@link #set(long index) set} + * and {@link #get(long index) get} than {@link java.util.BitSet} since we are using + * {@link org.roaringbitmap.RoaringBitmap} + * GitHub repository. + * Unfortunately, current version of {@link org.roaringbitmap.RoaringBitmap} supports + * only {@code int} indexes and limited to {@code Integer.MAX_VALUE} in size + * {@see https://github.com/RoaringBitmap/RoaringBitmap/issues/109}. + * To support {@code Long.MAX_VALUE} size we have to maintain + * array of {@link org.roaringbitmap.RoaringBitmap}. + */ class RoaringBitmapArray { - private final RoaringBitmap[] data; - private long bitCount; // number of 1`s in bitset - private final long numBits; // total number of available bits - - private static int numOfBuckets(long numBits) { - if (numBits <= 0) { - throw new IllegalArgumentException("numBits must be positive, but got " + numBits); - } - return (int) Math.ceil(numBits / (double)Integer.MAX_VALUE); - } - - private static RoaringBitmap[] initialVector(int numOfBuckets){ - RoaringBitmap[] vector = new RoaringBitmap[numOfBuckets]; - for(int i = 0;i Boolean = _ % 2 == 0 + val even = allBytes.filter(isEven) + val odd = allBytes.filterNot(isEven) + + val filter1 = BloomFilter.create(256) + even.foreach(filter1.put) + + val filter2 = BloomFilter.create(256) + odd.foreach(filter2.put) + + filter1.mergeInPlace(filter2) + + // After merge, `filter1` has `numItems` items which doesn't exceed `expectedNumItems`, so the + // `expectedFpp` should not be significantly higher than the default one. + assert(filter1.expectedFpp() - BloomFilter.DEFAULT_FPP < EPSILON) + + even.foreach(i => assert(filter1.mightContain(i))) + odd.foreach(i => assert(filter1.mightContain(i))) + + checkSerDe(filter1) + } +} \ No newline at end of file diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/RoaringBitmapArraySuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/RoaringBitmapArraySuite.scala index 16ded6af3a87..a302fc0c52f7 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/RoaringBitmapArraySuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/RoaringBitmapArraySuite.scala @@ -65,4 +65,4 @@ class RoaringBitmapArraySuite extends FunSuite { // scalastyle:ignore funsuite indexes2.foreach(i => assert(bitArray1.get(i))) assert(bitArray1.cardinality() == (indexes1 ++ indexes2).distinct.length) } -} +} \ No newline at end of file From 487da8fbd23f2d861ab10a1faf701696e9ca3967 Mon Sep 17 00:00:00 2001 From: Alexey Ponkin Date: Thu, 17 Nov 2016 23:46:57 +0300 Subject: [PATCH 3/3] SPARK-18252: Fixing RAT style errors --- .../org/apache/spark/util/sketch/BloomFilter.java | 2 +- .../apache/spark/util/sketch/BloomFilterImpl.java | 2 +- .../org/apache/spark/util/sketch/Murmur3_128.java | 2 +- .../apache/spark/util/sketch/Murmur3_128Suite.java | 2 +- .../apache/spark/util/sketch/BloomFilterSuite.scala | 13 +++---------- .../spark/util/sketch/RoaringBitmapArraySuite.scala | 2 +- 6 files changed, 8 insertions(+), 15 deletions(-) diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java index 58dd996ab0e3..f7e9143064ac 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilter.java @@ -233,4 +233,4 @@ public static BloomFilter create(long expectedNumItems, long numBits) { return new BloomFilterImpl(optimalNumOfHashFunctions(expectedNumItems, numBits), numBits); } -} \ No newline at end of file +} diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java index daa6c866ab66..fde2d14d20cb 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/BloomFilterImpl.java @@ -253,4 +253,4 @@ private void writeObject(ObjectOutputStream out) throws IOException { private void readObject(ObjectInputStream in) throws IOException { readFrom0(in); } -} \ No newline at end of file +} diff --git a/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_128.java b/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_128.java index 0de3ecd71f91..9041e203966d 100644 --- a/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_128.java +++ b/common/sketch/src/main/java/org/apache/spark/util/sketch/Murmur3_128.java @@ -147,4 +147,4 @@ private static long mixK2(long k2) { k2 *= C1; return k2; } -} \ No newline at end of file +} diff --git a/common/sketch/src/test/java/org/apache/spark/util/sketch/Murmur3_128Suite.java b/common/sketch/src/test/java/org/apache/spark/util/sketch/Murmur3_128Suite.java index 230d20719243..e3b6c491d33a 100644 --- a/common/sketch/src/test/java/org/apache/spark/util/sketch/Murmur3_128Suite.java +++ b/common/sketch/src/test/java/org/apache/spark/util/sketch/Murmur3_128Suite.java @@ -47,4 +47,4 @@ private static void assertHash(int seed, long expected1, long expected2, String Assert.assertEquals(expected1, hash128bit[0]); Assert.assertEquals(expected2, hash128bit[1]); } -} \ No newline at end of file +} diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala index 96486e17f093..cee8370256e9 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/BloomFilterSuite.scala @@ -130,11 +130,7 @@ class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite } } - /** - * Instead of random generator we can check - * BloomFilter for every Byte value - * since it has only 256 values - */ + // Separate test for Byte type, since we can enumerate all possible values test(s"accuracy - Byte") { // Byte is from -128 to 127 inclusive val allBytes = (-128 to 127).map(_.toByte) @@ -162,10 +158,7 @@ class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite checkSerDe(filter) } - /** - * Separate test for Byte type, - * Since we can enumerate all possible values - */ + // Separate test for Byte type, since we can enumerate all possible values test(s"mergeInPlace - Byte") { val allBytes = (-128 to 127).map(_.toByte) val fpp = 0.05 @@ -191,4 +184,4 @@ class BloomFilterSuite extends FunSuite { // scalastyle:ignore funsuite checkSerDe(filter1) } -} \ No newline at end of file +} diff --git a/common/sketch/src/test/scala/org/apache/spark/util/sketch/RoaringBitmapArraySuite.scala b/common/sketch/src/test/scala/org/apache/spark/util/sketch/RoaringBitmapArraySuite.scala index a302fc0c52f7..16ded6af3a87 100644 --- a/common/sketch/src/test/scala/org/apache/spark/util/sketch/RoaringBitmapArraySuite.scala +++ b/common/sketch/src/test/scala/org/apache/spark/util/sketch/RoaringBitmapArraySuite.scala @@ -65,4 +65,4 @@ class RoaringBitmapArraySuite extends FunSuite { // scalastyle:ignore funsuite indexes2.foreach(i => assert(bitArray1.get(i))) assert(bitArray1.cardinality() == (indexes1 ++ indexes2).distinct.length) } -} \ No newline at end of file +}