diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java index 55df783..967a958 100644 --- a/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java @@ -47,10 +47,7 @@ import com.teragrep.functions.dpf_03.BloomFilterAggregator; import com.teragrep.pth10.steps.AbstractStep; -import com.teragrep.pth10.steps.teragrep.bloomfilter.BloomFilterForeachPartitionFunction; -import com.teragrep.pth10.steps.teragrep.bloomfilter.BloomFilterTable; -import com.teragrep.pth10.steps.teragrep.bloomfilter.FilterTypes; -import com.teragrep.pth10.steps.teragrep.bloomfilter.LazyConnection; +import com.teragrep.pth10.steps.teragrep.bloomfilter.*; import com.typesafe.config.Config; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -61,8 +58,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.Map; -import java.util.SortedMap; +import java.util.*; /** * teragrep exec bloom @@ -80,6 +76,8 @@ public enum BloomMode { private final String inputCol; private final String outputCol; private final String estimateCol; + private final FilterTypes filterTypes; + private final LazyConnection connection; public TeragrepBloomStep( Config zeppelinConfig, @@ -87,12 +85,34 @@ public TeragrepBloomStep( String inputCol, String outputCol, String estimateCol + ) { + this( + zeppelinConfig, + mode, + inputCol, + outputCol, + estimateCol, + new FilterTypes(zeppelinConfig), + new LazyConnection(zeppelinConfig) + ); + } + + public TeragrepBloomStep( + Config zeppelinConfig, + BloomMode mode, + String inputCol, + String outputCol, + String estimateCol, + FilterTypes filterTypes, + LazyConnection connection ) { this.zeppelinConfig = zeppelinConfig; this.mode = mode; this.inputCol = inputCol; this.outputCol = outputCol; this.estimateCol = estimateCol; + this.connection = connection; + this.filterTypes = filterTypes; if (mode == BloomMode.ESTIMATE || mode == BloomMode.AGGREGATE) { // estimate is run as an aggregation @@ -133,7 +153,8 @@ public Dataset get(Dataset dataset) { * @return Dataset unmodified */ private Dataset createBloomFilter(Dataset dataset) { - writeFilterTypes(this.zeppelinConfig); + createFilterTypeTable(); + writeFilterTypes(); final BloomFilterTable table = new BloomFilterTable(zeppelinConfig); table.create(); dataset.foreachPartition(new BloomFilterForeachPartitionFunction(this.zeppelinConfig)); @@ -147,7 +168,8 @@ private Dataset createBloomFilter(Dataset dataset) { * @return Dataset unmodified */ private Dataset updateBloomFilter(Dataset dataset) { - writeFilterTypes(this.zeppelinConfig); + createFilterTypeTable(); + writeFilterTypes(); final BloomFilterTable table = new BloomFilterTable(zeppelinConfig); table.create(); dataset.foreachPartition(new BloomFilterForeachPartitionFunction(this.zeppelinConfig, true)); @@ -162,47 +184,65 @@ private Dataset estimateSize(Dataset dataset) { } public Dataset aggregate(Dataset dataset) { - - FilterTypes filterTypes = new FilterTypes(this.zeppelinConfig); - - BloomFilterAggregator agg = new BloomFilterAggregator(inputCol, estimateCol, filterTypes.sortedMap()); - + final SortedMap map = new TreeMap<>(); + final List fieldList = filterTypes.fieldList(); + for (final FilterField field : fieldList) { + map.put(field.expected(), field.fpp()); + } + final BloomFilterAggregator agg = new BloomFilterAggregator(inputCol, estimateCol, map); return dataset.groupBy("partition").agg(agg.toColumn().as("bloomfilter")); - } - private void writeFilterTypes(final Config config) { - final FilterTypes filterTypes = new FilterTypes(config); - final Connection connection = new LazyConnection(config).get(); - final SortedMap filterSizeMap = filterTypes.sortedMap(); + private void writeFilterTypes() { + final List fieldList = filterTypes.fieldList(); final String pattern = filterTypes.pattern(); - for (final Map.Entry entry : filterSizeMap.entrySet()) { + final Connection conn = connection.get(); + for (final FilterField field : fieldList) { + final int expectedInt = field.expectedIntValue(); + final double fpp = field.fpp(); if (LOGGER.isInfoEnabled()) { LOGGER .info( - "Writing filtertype (expected <[{}]>, fpp: <[{}]>, pattern: <[{}]>)", entry.getKey(), - entry.getValue(), pattern + "Writing filtertype (expected <[{}]>, fpp: <[{}]>, pattern: <[{}]>)", expectedInt, fpp, + pattern ); } final String sql = "INSERT IGNORE INTO `filtertype` (`expectedElements`, `targetFpp`, `pattern`) VALUES (?, ?, ?)"; - try (final PreparedStatement stmt = connection.prepareStatement(sql)) { - stmt.setInt(1, entry.getKey().intValue()); // filtertype.expectedElements - stmt.setDouble(2, entry.getValue()); // filtertype.targetFpp + try (final PreparedStatement stmt = conn.prepareStatement(sql)) { + stmt.setInt(1, expectedInt); // filtertype.expectedElements + stmt.setDouble(2, fpp); // filtertype.targetFpp stmt.setString(3, pattern); // filtertype.pattern stmt.executeUpdate(); stmt.clearParameters(); - connection.commit(); + conn.commit(); } catch (SQLException e) { if (LOGGER.isErrorEnabled()) { LOGGER .error( "Error writing filter[expected: <{}>, fpp: <{}>, pattern: <{}>] into database", - entry.getKey(), entry.getValue(), pattern + expectedInt, fpp, pattern ); } throw new RuntimeException(e); } } } + + private void createFilterTypeTable() { + // from pth-06/database/bloomdb + final String sql = "CREATE TABLE IF NOT EXISTS `filtertype` (" + + " `id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY," + + " `expectedElements` bigint(20) UNSIGNED NOT NULL," + + " `targetFpp` DOUBLE(2, 2) UNSIGNED NOT NULL," + + " `pattern` VARCHAR(2048) NOT NULL," + + " UNIQUE KEY (`expectedElements`, `targetFpp`, `pattern`)" + ") ENGINE = InnoDB" + + " DEFAULT CHARSET = utf8mb4" + " COLLATE = utf8mb4_unicode_ci;"; + try (final PreparedStatement statement = connection.get().prepareStatement(sql)) { + statement.execute(); + } + catch (SQLException e) { + throw new RuntimeException("Error creating `filtertype` table: " + e.getMessage()); + } + } } diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterField.java b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterField.java new file mode 100644 index 0000000..253a9b0 --- /dev/null +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterField.java @@ -0,0 +1,94 @@ +/* + * Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10) + * Copyright (C) 2019-2024 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ +package com.teragrep.pth10.steps.teragrep.bloomfilter; + +import org.apache.spark.util.sketch.BloomFilter; + +import java.util.Objects; + +public final class FilterField { + + private final Long expected; + private final Double fpp; + + public FilterField(long expected, double fpp) { + this.expected = expected; + this.fpp = fpp; + } + + public Long expected() { + return expected; + } + + public int expectedIntValue() { + return expected.intValue(); + } + + public Double fpp() { + return fpp; + } + + public long bitSize() { + return BloomFilter.create(expected, fpp).bitSize(); + } + + @Override + public boolean equals(Object object) { + if (this == object) + return true; + if (object == null) + return false; + if (object.getClass() != this.getClass()) + return false; + final FilterField cast = (FilterField) object; + return this.expected.equals(cast.expected) && this.fpp.equals(cast.fpp); + } + + @Override + public int hashCode() { + return Objects.hash(expected, fpp); + } +} diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java index 6bd7657..4bb7a1f 100644 --- a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java @@ -46,9 +46,9 @@ package com.teragrep.pth10.steps.teragrep.bloomfilter; import com.google.gson.Gson; -import com.google.gson.JsonObject; +import com.google.gson.JsonIOException; +import com.google.gson.JsonSyntaxException; import com.typesafe.config.Config; -import org.apache.spark.util.sketch.BloomFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sparkproject.guava.reflect.TypeToken; @@ -73,35 +73,32 @@ public FilterTypes(Config config) { * * @return SortedMap of filter configuration */ - public SortedMap sortedMap() { - final SortedMap sizesMapFromJson = new TreeMap<>(); - final Gson gson = new Gson(); - final List jsonArray = gson.fromJson(sizesJsonString(), new TypeToken>() { - }.getType()); - for (final JsonObject object : jsonArray) { - if (object.has("expected") && object.has("fpp")) { - final Long expectedNumOfItems = Long.parseLong(object.get("expected").toString()); - final Double fpp = Double.parseDouble(object.get("fpp").toString()); - if (sizesMapFromJson.containsKey(expectedNumOfItems)) { - LOGGER.error("Duplicate value of expected number of items value: <[{}]>", expectedNumOfItems); - throw new RuntimeException("Duplicate entry expected num of items"); - } - sizesMapFromJson.put(expectedNumOfItems, fpp); - } - else { - throw new RuntimeException("JSON did not have expected values of 'expected' or 'fpp'"); - } + public List fieldList() { + final List fieldList; + try { + final Gson gson = new Gson(); + fieldList = gson.fromJson(sizesJsonString(), new TypeToken>() { + }.getType()); + } + catch (JsonIOException | JsonSyntaxException e) { + throw new RuntimeException( + "Error reading 'dpl.pth_06.bloom.db.fields' option to JSON, check that option is formated as JSON array and that there are no duplicate values: " + + e.getMessage() + ); + } + final boolean hasDuplicates = new HashSet<>(fieldList).size() != fieldList.size(); + if (hasDuplicates) { + throw new RuntimeException("Found duplicate values in 'dpl.pth_06.bloom.db.fields'"); } - return sizesMapFromJson; + return fieldList; } public Map bitSizeMap() { - final Map filterSizes = sortedMap(); + final List fieldList = fieldList(); final Map bitsizeToExpectedItemsMap = new HashMap<>(); // Calculate bitSizes - for (final Map.Entry entry : filterSizes.entrySet()) { - final BloomFilter bf = BloomFilter.create(entry.getKey(), entry.getValue()); - bitsizeToExpectedItemsMap.put(bf.bitSize(), entry.getKey()); + for (final FilterField field : fieldList) { + bitsizeToExpectedItemsMap.put(field.bitSize(), field.expected()); } return bitsizeToExpectedItemsMap; } diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilter.java b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilter.java index 257d843..fb2f4f7 100644 --- a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilter.java +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilter.java @@ -54,8 +54,10 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,9 +93,17 @@ public void saveFilter(final Boolean overwrite) { final Double selectedFpp; final Map bitSizeMap = filterTypes.bitSizeMap(); if (bitSizeMap.containsKey(bitSize)) { - final long expectedItems = bitSizeMap.get(bitSize); - selectedExpectedNumOfItems = expectedItems; - selectedFpp = filterTypes.sortedMap().get(expectedItems); + selectedExpectedNumOfItems = bitSizeMap.get(bitSize); + List fppList = filterTypes + .fieldList() + .stream() + .filter(f -> f.expected().equals(selectedExpectedNumOfItems)) + .map(FilterField::fpp) + .collect(Collectors.toList()); + if (fppList.size() != 1) { + throw new RuntimeException("Could not find fpp value for bit size: <[" + bitSize + "]>"); + } + selectedFpp = fppList.get(0); } else { throw new IllegalArgumentException("no such filterSize <[" + bitSize + "]>"); diff --git a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/BloomFilterTableTest.java b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/BloomFilterTableTest.java index fe06e67..c9e3160 100644 --- a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/BloomFilterTableTest.java +++ b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/BloomFilterTableTest.java @@ -60,7 +60,7 @@ class BloomFilterTableTest { final String username = "sa"; final String password = ""; - final String connectionUrl = "jdbc:h2:~/test;MODE=MariaDB;DATABASE_TO_LOWER=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE"; + final String connectionUrl = "jdbc:h2:mem:test;MODE=MariaDB;DATABASE_TO_LOWER=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE"; @BeforeAll void setEnv() { diff --git a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterFieldTest.java b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterFieldTest.java new file mode 100644 index 0000000..2984e3f --- /dev/null +++ b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterFieldTest.java @@ -0,0 +1,87 @@ +/* + * Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10) + * Copyright (C) 2019-2024 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ +package com.teragrep.pth10.steps.teragrep.bloomfilter; + +import org.apache.spark.util.sketch.BloomFilter; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class FilterFieldTest { + + @Test + public void testBitSize() { + long expected = BloomFilter.create(1000, 0.01).bitSize(); + long result = new FilterField(1000, 0.01).bitSize(); + Assertions.assertEquals(expected, result); + } + + @Test + public void testEquality() { + FilterField field1 = new FilterField(10, 0.01); + FilterField field2 = new FilterField(10, 0.01); + Assertions.assertEquals(field1, field2); + } + + @Test + public void testNonEquals() { + FilterField field1 = new FilterField(10, 0.01); + FilterField field2 = new FilterField(100, 0.01); + FilterField field3 = new FilterField(10, 0.02); + Assertions.assertNotEquals(field1, field2); + Assertions.assertNotEquals(field1, field3); + } + + @Test + public void testHashCode() { + FilterField field1 = new FilterField(10, 0.01); + FilterField field2 = new FilterField(10, 0.01); + FilterField field3 = new FilterField(11, 0.01); + FilterField field4 = new FilterField(10, 0.02); + Assertions.assertEquals(field1.hashCode(), field2.hashCode()); + Assertions.assertNotEquals(field1.hashCode(), field3.hashCode()); + Assertions.assertNotEquals(field1.hashCode(), field4.hashCode()); + } +} diff --git a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java index ac432ec..0b0b332 100644 --- a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java +++ b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java @@ -51,6 +51,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -59,7 +60,7 @@ class FilterTypesTest { @Test - public void testSortedMapMethod() { + public void testFieldListMethod() { Properties properties = new Properties(); properties .put( @@ -69,12 +70,46 @@ public void testSortedMapMethod() { ); Config config = ConfigFactory.parseProperties(properties); FilterTypes filterTypes = new FilterTypes(config); - Map resultMap = filterTypes.sortedMap(); - assertEquals(0.01, resultMap.get(1000L)); - assertEquals(0.01, resultMap.get(2000L)); - assertEquals(0.01, resultMap.get(3000L)); - assertEquals(3, resultMap.size()); + List fieldList = filterTypes.fieldList(); + assertEquals(1000, fieldList.get(0).expected()); + assertEquals(2000, fieldList.get(1).expected()); + assertEquals(3000, fieldList.get(2).expected()); + assertEquals(0.01, fieldList.get(0).fpp()); + assertEquals(0.01, fieldList.get(1).fpp()); + assertEquals(0.01, fieldList.get(2).fpp()); + assertEquals(3, fieldList.size()); + } + + @Test + public void testMalformattedFieldsOption() { + Properties properties = new Properties(); + properties + .put( + "dpl.pth_06.bloom.db.fields", + "[" + "{expected: 1000, fpp: 0.01}," + "{expected: 2000, fpp: 0.01}," + + "{expected: 3000, fpp: 0.01}" + ); + Config config = ConfigFactory.parseProperties(properties); + FilterTypes filterTypes = new FilterTypes(config); + RuntimeException exception = assertThrows(RuntimeException.class, filterTypes::fieldList); + String expectedError = "Error reading 'dpl.pth_06.bloom.db.fields' option to JSON, check that option is formated as JSON array and that there are no duplicate values: "; + Assertions.assertTrue(exception.getMessage().contains(expectedError)); + } + @Test + public void testDuplicateValues() { + Properties properties = new Properties(); + properties + .put( + "dpl.pth_06.bloom.db.fields", + "[" + "{expected: 1000, fpp: 0.01}," + "{expected: 1000, fpp: 0.01}," + + "{expected: 3000, fpp: 0.01}]" + ); + Config config = ConfigFactory.parseProperties(properties); + FilterTypes filterTypes = new FilterTypes(config); + RuntimeException exception = assertThrows(RuntimeException.class, filterTypes::fieldList); + String expectedError = "Found duplicate values in 'dpl.pth_06.bloom.db.fields'"; + Assertions.assertEquals(expectedError, exception.getMessage()); } @Test @@ -131,7 +166,7 @@ public void testEquals() { Config config = ConfigFactory.parseProperties(properties); FilterTypes filterTypes1 = new FilterTypes(config); FilterTypes filterTypes2 = new FilterTypes(config); - filterTypes1.sortedMap(); + filterTypes1.fieldList(); filterTypes1.pattern(); filterTypes1.tableName(); filterTypes1.bitSizeMap(); diff --git a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilterTest.java b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilterTest.java index a354f06..6f84ae2 100644 --- a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilterTest.java +++ b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilterTest.java @@ -66,7 +66,7 @@ class TeragrepBloomFilterTest { private LazyConnection lazyConnection; private FilterTypes filterTypes; private final BloomFilter emptyFilter = BloomFilter.create(100, 0.01); - private SortedMap sizeMap; + private List filterFields; private final String tableName = "bloomfilter_test"; @BeforeAll @@ -76,7 +76,7 @@ void setEnv() { properties.put("dpl.pth_10.bloom.db.username", username); String password = ""; properties.put("dpl.pth_10.bloom.db.password", password); - String connectionUrl = "jdbc:h2:~/test;MODE=MariaDB;DATABASE_TO_LOWER=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE"; + String connectionUrl = "jdbc:h2:mem:test;MODE=MariaDB;DATABASE_TO_LOWER=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE"; properties.put("dpl.pth_06.bloom.db.url", connectionUrl); properties .put( @@ -96,7 +96,7 @@ void setEnv() { Class.forName("org.h2.Driver"); }); filterTypes = new FilterTypes(config); - sizeMap = filterTypes.sortedMap(); + filterFields = filterTypes.fieldList(); String createFilterType = "CREATE TABLE `filtertype` (" + "`id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY," + "`expectedElements` bigint(20) NOT NULL," + "`targetFpp` DOUBLE UNSIGNED NOT NULL," @@ -112,12 +112,12 @@ void setEnv() { conn.prepareStatement(createTable).execute(); }); int loops = 0; - for (Map.Entry entry : sizeMap.entrySet()) { + for (FilterField field : filterFields) { loops++; Assertions.assertDoesNotThrow(() -> { PreparedStatement stmt = conn.prepareStatement(insertSql); - stmt.setInt(1, entry.getKey().intValue()); // filtertype.expectedElements - stmt.setDouble(2, entry.getValue()); // filtertype.targetFpp + stmt.setInt(1, field.expectedIntValue()); // filtertype.expectedElements + stmt.setDouble(2, field.fpp()); // filtertype.targetFpp stmt.setString(3, pattern); stmt.executeUpdate(); stmt.clearParameters(); @@ -140,14 +140,14 @@ public void tearDown() { @Test void testSavingToDatabase() { List tokens = new ArrayList<>(Collections.singletonList("one")); - Row row = generatedRow(sizeMap, tokens); + Row row = generatedRow(filterFields, tokens); String partition = row.getString(0); byte[] filterBytes = (byte[]) row.get(1); BloomFilter rawFilter = Assertions .assertDoesNotThrow(() -> BloomFilter.readFrom(new ByteArrayInputStream(filterBytes))); TeragrepBloomFilter filter = new TeragrepBloomFilter(partition, rawFilter, lazyConnection.get(), filterTypes); filter.saveFilter(false); - Map.Entry entry = sizeMap.entrySet().iterator().next(); + FilterField first = filterFields.get(0); String sql = "SELECT `filter` FROM `" + tableName + "`"; Assertions.assertDoesNotThrow(() -> { ResultSet rs = lazyConnection.get().prepareStatement(sql).executeQuery(); @@ -165,7 +165,7 @@ void testSavingToDatabase() { Assertions.assertEquals(1, cols); Assertions.assertTrue(resultFilter.mightContain("one")); Assertions.assertFalse(resultFilter.mightContain("neo")); - Assertions.assertTrue(resultFilter.expectedFpp() <= entry.getValue()); + Assertions.assertTrue(resultFilter.expectedFpp() <= first.expected()); rs.close(); }); } @@ -173,7 +173,7 @@ void testSavingToDatabase() { @Test void testSavingToDatabaseWithOverwrite() { List tokens = new ArrayList<>(Collections.singletonList("one")); - Row row = generatedRow(sizeMap, tokens); + Row row = generatedRow(filterFields, tokens); String partition = row.getString(0); byte[] filterBytes = (byte[]) row.get(1); BloomFilter rawFilter = Assertions @@ -199,7 +199,7 @@ void testSavingToDatabaseWithOverwrite() { }); // Create second filter that will overwrite first one List secondTokens = new ArrayList<>(Collections.singletonList("neo")); - Row secondRow = generatedRow(sizeMap, secondTokens); + Row secondRow = generatedRow(filterFields, secondTokens); String secondPartition = secondRow.getString(0); byte[] secondFilterBytes = (byte[]) secondRow.get(1); BloomFilter rawFilter2 = Assertions @@ -238,7 +238,7 @@ void testCorrectFilterSizeSelection() { for (int i = 1; i < 1500; i++) { tokens.add("token:" + i); } - Row row = generatedRow(sizeMap, tokens); + Row row = generatedRow(filterFields, tokens); String partition = row.getString(0); byte[] filterBytes = (byte[]) row.get(1); BloomFilter rawFilter = Assertions @@ -246,13 +246,16 @@ void testCorrectFilterSizeSelection() { TeragrepBloomFilter filter = new TeragrepBloomFilter(partition, rawFilter, lazyConnection.get(), filterTypes); filter.saveFilter(false); long size = Long.MAX_VALUE; - for (long key : sizeMap.keySet()) { - if (size > key && key >= tokens.size()) { - size = key; + FilterField current = null; + for (FilterField field : filterFields) { + long expected = field.expected(); + if (size > expected && expected >= tokens.size()) { + size = expected; + current = field; } } - Double fpp = sizeMap.get(size); String sql = "SELECT `filter` FROM `" + tableName + "`"; + FilterField finalCurrent = current; Assertions.assertDoesNotThrow(() -> { ResultSet rs = lazyConnection.get().prepareStatement(sql).executeQuery(); int cols = rs.getMetaData().getColumnCount(); @@ -269,7 +272,8 @@ void testCorrectFilterSizeSelection() { Assertions.assertEquals(1, cols); Assertions.assertTrue(resultFilter.mightContain("one")); Assertions.assertFalse(resultFilter.mightContain("neo")); - Assertions.assertTrue(resultFilter.expectedFpp() <= fpp); + Assertions.assertNotNull(finalCurrent); + Assertions.assertTrue(resultFilter.expectedFpp() <= finalCurrent.fpp()); rs.close(); }); } @@ -290,7 +294,7 @@ public void testPatternSavedToDatabase() { @Test public void testEquals() { List tokens = new ArrayList<>(Collections.singletonList("one")); - Row row = generatedRow(sizeMap, tokens); + Row row = generatedRow(filterFields, tokens); String partition = row.getString(0); byte[] filterBytes = (byte[]) row.get(1); BloomFilter rawFilter = Assertions @@ -305,8 +309,8 @@ public void testEquals() { public void testNotEqualsTokens() { List tokens1 = new ArrayList<>(Collections.singletonList("one")); List tokens2 = new ArrayList<>(Collections.singletonList("two")); - Row row1 = generatedRow(sizeMap, tokens1); - Row row2 = generatedRow(sizeMap, tokens2); + Row row1 = generatedRow(filterFields, tokens1); + Row row2 = generatedRow(filterFields, tokens2); BloomFilter rawFilter1 = Assertions .assertDoesNotThrow(() -> BloomFilter.readFrom(new ByteArrayInputStream((byte[]) row1.get(1)))); BloomFilter rawFilter2 = Assertions @@ -328,14 +332,15 @@ public void testNotEqualsTokens() { // -- Helper methods -- - private Row generatedRow(SortedMap filterMap, List tokens) { - long size = filterMap.lastKey(); - for (long key : filterMap.keySet()) { - if (key < size && key >= tokens.size()) { - size = key; + private Row generatedRow(List fieldList, List tokens) { + FilterField current = fieldList.get(fieldList.size() - 1); + for (FilterField field : fieldList) { + Long expected = field.expected(); + if (expected < current.expected() && expected >= tokens.size()) { + current = field; } } - BloomFilter bf = BloomFilter.create(size, filterMap.get(size)); + BloomFilter bf = BloomFilter.create(current.expectedIntValue(), current.fpp()); tokens.forEach(bf::put); ByteArrayOutputStream baos = new ByteArrayOutputStream(); Assertions.assertDoesNotThrow(() -> {