From dab8d2fc9b6cb01b1f19d0b2ee5e06bd1a9f8141 Mon Sep 17 00:00:00 2001 From: elliVM <47@teragrep.com> Date: Wed, 9 Oct 2024 15:54:25 +0300 Subject: [PATCH 1/7] Add exception handling for json reading and use FilterField object to store filter size field options --- .../steps/teragrep/TeragrepBloomStep.java | 37 ++++---- .../teragrep/bloomfilter/FilterField.java | 94 +++++++++++++++++++ .../teragrep/bloomfilter/FilterTypes.java | 41 ++++---- .../bloomfilter/TeragrepBloomFilter.java | 16 +++- .../teragrep/bloomfilter/FilterTypesTest.java | 46 +++++++-- .../bloomfilter/TeragrepBloomFilterTest.java | 55 ++++++----- 6 files changed, 210 insertions(+), 79 deletions(-) create mode 100644 src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterField.java diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java index 55df783..ae85e01 100644 --- a/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java @@ -47,10 +47,7 @@ import com.teragrep.functions.dpf_03.BloomFilterAggregator; import com.teragrep.pth10.steps.AbstractStep; -import com.teragrep.pth10.steps.teragrep.bloomfilter.BloomFilterForeachPartitionFunction; -import com.teragrep.pth10.steps.teragrep.bloomfilter.BloomFilterTable; -import com.teragrep.pth10.steps.teragrep.bloomfilter.FilterTypes; -import com.teragrep.pth10.steps.teragrep.bloomfilter.LazyConnection; +import com.teragrep.pth10.steps.teragrep.bloomfilter.*; import com.typesafe.config.Config; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -61,8 +58,7 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; -import java.util.Map; -import java.util.SortedMap; +import java.util.*; /** * teragrep exec bloom @@ -162,32 +158,35 @@ private Dataset estimateSize(Dataset dataset) { } public Dataset aggregate(Dataset dataset) { - - FilterTypes filterTypes = new FilterTypes(this.zeppelinConfig); - - BloomFilterAggregator agg = new BloomFilterAggregator(inputCol, estimateCol, filterTypes.sortedMap()); - + final FilterTypes filterTypes = new FilterTypes(this.zeppelinConfig); + final SortedMap map = new TreeMap<>(); + final List fieldList = filterTypes.fieldList(); + for (final FilterField field : fieldList) { + map.put(field.expected(), field.fpp()); + } + final BloomFilterAggregator agg = new BloomFilterAggregator(inputCol, estimateCol, map); return dataset.groupBy("partition").agg(agg.toColumn().as("bloomfilter")); - } private void writeFilterTypes(final Config config) { final FilterTypes filterTypes = new FilterTypes(config); final Connection connection = new LazyConnection(config).get(); - final SortedMap filterSizeMap = filterTypes.sortedMap(); + final List fieldList = filterTypes.fieldList(); final String pattern = filterTypes.pattern(); - for (final Map.Entry entry : filterSizeMap.entrySet()) { + for (final FilterField field : fieldList) { + final int expectedInt = field.expectedIntValue(); + final double fpp = field.fpp(); if (LOGGER.isInfoEnabled()) { LOGGER .info( - "Writing filtertype (expected <[{}]>, fpp: <[{}]>, pattern: <[{}]>)", entry.getKey(), - entry.getValue(), pattern + "Writing filtertype (expected <[{}]>, fpp: <[{}]>, pattern: <[{}]>)", expectedInt, fpp, + pattern ); } final String sql = "INSERT IGNORE INTO `filtertype` (`expectedElements`, `targetFpp`, `pattern`) VALUES (?, ?, ?)"; try (final PreparedStatement stmt = connection.prepareStatement(sql)) { - stmt.setInt(1, entry.getKey().intValue()); // filtertype.expectedElements - stmt.setDouble(2, entry.getValue()); // filtertype.targetFpp + stmt.setInt(1, expectedInt); // filtertype.expectedElements + stmt.setDouble(2, fpp); // filtertype.targetFpp stmt.setString(3, pattern); // filtertype.pattern stmt.executeUpdate(); stmt.clearParameters(); @@ -198,7 +197,7 @@ private void writeFilterTypes(final Config config) { LOGGER .error( "Error writing filter[expected: <{}>, fpp: <{}>, pattern: <{}>] into database", - entry.getKey(), entry.getValue(), pattern + expectedInt, fpp, pattern ); } throw new RuntimeException(e); diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterField.java b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterField.java new file mode 100644 index 0000000..59d7af3 --- /dev/null +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterField.java @@ -0,0 +1,94 @@ +/* + * Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10) + * Copyright (C) 2019-2024 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ +package com.teragrep.pth10.steps.teragrep.bloomfilter; + +import org.apache.spark.util.sketch.BloomFilter; + +import java.util.Objects; + +public class FilterField { + + private final Long expected; + private final Double fpp; + + public FilterField(long expected, double fpp) { + this.expected = expected; + this.fpp = fpp; + } + + public Long expected() { + return expected; + } + + public int expectedIntValue() { + return expected.intValue(); + } + + public Double fpp() { + return fpp; + } + + public long bitSize() { + return BloomFilter.create(expected, fpp).bitSize(); + } + + @Override + public boolean equals(Object object) { + if (this == object) + return true; + if (object == null) + return false; + if (object.getClass() != this.getClass()) + return false; + final FilterField cast = (FilterField) object; + return this.expected.equals(cast.expected) && this.fpp.equals(cast.fpp); + } + + @Override + public int hashCode() { + return Objects.hash(expected, fpp); + } +} diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java index 6bd7657..d6505d3 100644 --- a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java @@ -46,9 +46,7 @@ package com.teragrep.pth10.steps.teragrep.bloomfilter; import com.google.gson.Gson; -import com.google.gson.JsonObject; import com.typesafe.config.Config; -import org.apache.spark.util.sketch.BloomFilter; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.sparkproject.guava.reflect.TypeToken; @@ -73,35 +71,28 @@ public FilterTypes(Config config) { * * @return SortedMap of filter configuration */ - public SortedMap sortedMap() { - final SortedMap sizesMapFromJson = new TreeMap<>(); - final Gson gson = new Gson(); - final List jsonArray = gson.fromJson(sizesJsonString(), new TypeToken>() { - }.getType()); - for (final JsonObject object : jsonArray) { - if (object.has("expected") && object.has("fpp")) { - final Long expectedNumOfItems = Long.parseLong(object.get("expected").toString()); - final Double fpp = Double.parseDouble(object.get("fpp").toString()); - if (sizesMapFromJson.containsKey(expectedNumOfItems)) { - LOGGER.error("Duplicate value of expected number of items value: <[{}]>", expectedNumOfItems); - throw new RuntimeException("Duplicate entry expected num of items"); - } - sizesMapFromJson.put(expectedNumOfItems, fpp); - } - else { - throw new RuntimeException("JSON did not have expected values of 'expected' or 'fpp'"); - } + public List fieldList() { + final List fieldList; + try { + final Gson gson = new Gson(); + fieldList = gson.fromJson(sizesJsonString(), new TypeToken>() { + }.getType()); + } + catch (Exception e) { + throw new RuntimeException( + "Error reading 'dpl.pth_06.bloom.db.fields' option to JSON, check that option is formated as JSON array and that there are no duplicate values: " + + e.getMessage() + ); } - return sizesMapFromJson; + return fieldList; } public Map bitSizeMap() { - final Map filterSizes = sortedMap(); + final List fieldList = fieldList(); final Map bitsizeToExpectedItemsMap = new HashMap<>(); // Calculate bitSizes - for (final Map.Entry entry : filterSizes.entrySet()) { - final BloomFilter bf = BloomFilter.create(entry.getKey(), entry.getValue()); - bitsizeToExpectedItemsMap.put(bf.bitSize(), entry.getKey()); + for (final FilterField field : fieldList) { + bitsizeToExpectedItemsMap.put(field.bitSize(), field.expected()); } return bitsizeToExpectedItemsMap; } diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilter.java b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilter.java index 257d843..fb2f4f7 100644 --- a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilter.java +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilter.java @@ -54,8 +54,10 @@ import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.SQLException; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -91,9 +93,17 @@ public void saveFilter(final Boolean overwrite) { final Double selectedFpp; final Map bitSizeMap = filterTypes.bitSizeMap(); if (bitSizeMap.containsKey(bitSize)) { - final long expectedItems = bitSizeMap.get(bitSize); - selectedExpectedNumOfItems = expectedItems; - selectedFpp = filterTypes.sortedMap().get(expectedItems); + selectedExpectedNumOfItems = bitSizeMap.get(bitSize); + List fppList = filterTypes + .fieldList() + .stream() + .filter(f -> f.expected().equals(selectedExpectedNumOfItems)) + .map(FilterField::fpp) + .collect(Collectors.toList()); + if (fppList.size() != 1) { + throw new RuntimeException("Could not find fpp value for bit size: <[" + bitSize + "]>"); + } + selectedFpp = fppList.get(0); } else { throw new IllegalArgumentException("no such filterSize <[" + bitSize + "]>"); diff --git a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java index ac432ec..3099fb7 100644 --- a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java +++ b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java @@ -51,6 +51,7 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; +import java.util.List; import java.util.Map; import java.util.Properties; @@ -59,7 +60,7 @@ class FilterTypesTest { @Test - public void testSortedMapMethod() { + public void testFieldListMethod() { Properties properties = new Properties(); properties .put( @@ -69,12 +70,43 @@ public void testSortedMapMethod() { ); Config config = ConfigFactory.parseProperties(properties); FilterTypes filterTypes = new FilterTypes(config); - Map resultMap = filterTypes.sortedMap(); - assertEquals(0.01, resultMap.get(1000L)); - assertEquals(0.01, resultMap.get(2000L)); - assertEquals(0.01, resultMap.get(3000L)); - assertEquals(3, resultMap.size()); + List fieldList = filterTypes.fieldList(); + assertEquals(0.01, fieldList.get(0).fpp()); + assertEquals(0.01, fieldList.get(1).fpp()); + assertEquals(0.01, fieldList.get(2).fpp()); + assertEquals(3, fieldList.size()); + } + + @Test + public void testMalformattedFieldsOption() { + Properties properties = new Properties(); + properties + .put( + "dpl.pth_06.bloom.db.fields", + "[" + "{expected: 1000, fpp: 0.01}," + "{expected: 2000, fpp: 0.01}," + + "{expected: 3000, fpp: 0.01}" + ); + Config config = ConfigFactory.parseProperties(properties); + FilterTypes filterTypes = new FilterTypes(config); + RuntimeException exception = assertThrows(RuntimeException.class, filterTypes::fieldList); + String expectedError = "Error reading 'dpl.pth_06.bloom.db.fields' option to JSON, check that option is formated as JSON array and that there are no duplicate values: "; + Assertions.assertTrue(exception.getMessage().contains(expectedError)); + } + @Test + public void testDuplicateValues() { + Properties properties = new Properties(); + properties + .put( + "dpl.pth_06.bloom.db.fields", + "[" + "{expected: 1000, fpp: 0.01}," + "{expected: 1000, fpp: 0.01}," + + "{expected: 3000, fpp: 0.01}" + ); + Config config = ConfigFactory.parseProperties(properties); + FilterTypes filterTypes = new FilterTypes(config); + RuntimeException exception = assertThrows(RuntimeException.class, filterTypes::fieldList); + String expectedError = "Error reading 'dpl.pth_06.bloom.db.fields' option to JSON, check that option is formated as JSON array and that there are no duplicate values: "; + Assertions.assertTrue(exception.getMessage().contains(expectedError)); } @Test @@ -131,7 +163,7 @@ public void testEquals() { Config config = ConfigFactory.parseProperties(properties); FilterTypes filterTypes1 = new FilterTypes(config); FilterTypes filterTypes2 = new FilterTypes(config); - filterTypes1.sortedMap(); + filterTypes1.fieldList(); filterTypes1.pattern(); filterTypes1.tableName(); filterTypes1.bitSizeMap(); diff --git a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilterTest.java b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilterTest.java index a354f06..dc71080 100644 --- a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilterTest.java +++ b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilterTest.java @@ -66,7 +66,7 @@ class TeragrepBloomFilterTest { private LazyConnection lazyConnection; private FilterTypes filterTypes; private final BloomFilter emptyFilter = BloomFilter.create(100, 0.01); - private SortedMap sizeMap; + private List filterFields; private final String tableName = "bloomfilter_test"; @BeforeAll @@ -96,7 +96,7 @@ void setEnv() { Class.forName("org.h2.Driver"); }); filterTypes = new FilterTypes(config); - sizeMap = filterTypes.sortedMap(); + filterFields = filterTypes.fieldList(); String createFilterType = "CREATE TABLE `filtertype` (" + "`id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY," + "`expectedElements` bigint(20) NOT NULL," + "`targetFpp` DOUBLE UNSIGNED NOT NULL," @@ -112,12 +112,12 @@ void setEnv() { conn.prepareStatement(createTable).execute(); }); int loops = 0; - for (Map.Entry entry : sizeMap.entrySet()) { + for (FilterField field : filterFields) { loops++; Assertions.assertDoesNotThrow(() -> { PreparedStatement stmt = conn.prepareStatement(insertSql); - stmt.setInt(1, entry.getKey().intValue()); // filtertype.expectedElements - stmt.setDouble(2, entry.getValue()); // filtertype.targetFpp + stmt.setInt(1, field.expectedIntValue()); // filtertype.expectedElements + stmt.setDouble(2, field.fpp()); // filtertype.targetFpp stmt.setString(3, pattern); stmt.executeUpdate(); stmt.clearParameters(); @@ -140,14 +140,14 @@ public void tearDown() { @Test void testSavingToDatabase() { List tokens = new ArrayList<>(Collections.singletonList("one")); - Row row = generatedRow(sizeMap, tokens); + Row row = generatedRow(filterFields, tokens); String partition = row.getString(0); byte[] filterBytes = (byte[]) row.get(1); BloomFilter rawFilter = Assertions .assertDoesNotThrow(() -> BloomFilter.readFrom(new ByteArrayInputStream(filterBytes))); TeragrepBloomFilter filter = new TeragrepBloomFilter(partition, rawFilter, lazyConnection.get(), filterTypes); filter.saveFilter(false); - Map.Entry entry = sizeMap.entrySet().iterator().next(); + FilterField first = filterFields.get(0); String sql = "SELECT `filter` FROM `" + tableName + "`"; Assertions.assertDoesNotThrow(() -> { ResultSet rs = lazyConnection.get().prepareStatement(sql).executeQuery(); @@ -165,7 +165,7 @@ void testSavingToDatabase() { Assertions.assertEquals(1, cols); Assertions.assertTrue(resultFilter.mightContain("one")); Assertions.assertFalse(resultFilter.mightContain("neo")); - Assertions.assertTrue(resultFilter.expectedFpp() <= entry.getValue()); + Assertions.assertTrue(resultFilter.expectedFpp() <= first.expected()); rs.close(); }); } @@ -173,7 +173,7 @@ void testSavingToDatabase() { @Test void testSavingToDatabaseWithOverwrite() { List tokens = new ArrayList<>(Collections.singletonList("one")); - Row row = generatedRow(sizeMap, tokens); + Row row = generatedRow(filterFields, tokens); String partition = row.getString(0); byte[] filterBytes = (byte[]) row.get(1); BloomFilter rawFilter = Assertions @@ -199,7 +199,7 @@ void testSavingToDatabaseWithOverwrite() { }); // Create second filter that will overwrite first one List secondTokens = new ArrayList<>(Collections.singletonList("neo")); - Row secondRow = generatedRow(sizeMap, secondTokens); + Row secondRow = generatedRow(filterFields, secondTokens); String secondPartition = secondRow.getString(0); byte[] secondFilterBytes = (byte[]) secondRow.get(1); BloomFilter rawFilter2 = Assertions @@ -238,7 +238,7 @@ void testCorrectFilterSizeSelection() { for (int i = 1; i < 1500; i++) { tokens.add("token:" + i); } - Row row = generatedRow(sizeMap, tokens); + Row row = generatedRow(filterFields, tokens); String partition = row.getString(0); byte[] filterBytes = (byte[]) row.get(1); BloomFilter rawFilter = Assertions @@ -246,13 +246,16 @@ void testCorrectFilterSizeSelection() { TeragrepBloomFilter filter = new TeragrepBloomFilter(partition, rawFilter, lazyConnection.get(), filterTypes); filter.saveFilter(false); long size = Long.MAX_VALUE; - for (long key : sizeMap.keySet()) { - if (size > key && key >= tokens.size()) { - size = key; + FilterField current = null; + for (FilterField field : filterFields) { + long expected = field.expected(); + if (size > expected && expected >= tokens.size()) { + size = expected; + current = field; } } - Double fpp = sizeMap.get(size); String sql = "SELECT `filter` FROM `" + tableName + "`"; + FilterField finalCurrent = current; Assertions.assertDoesNotThrow(() -> { ResultSet rs = lazyConnection.get().prepareStatement(sql).executeQuery(); int cols = rs.getMetaData().getColumnCount(); @@ -269,7 +272,8 @@ void testCorrectFilterSizeSelection() { Assertions.assertEquals(1, cols); Assertions.assertTrue(resultFilter.mightContain("one")); Assertions.assertFalse(resultFilter.mightContain("neo")); - Assertions.assertTrue(resultFilter.expectedFpp() <= fpp); + Assertions.assertNotNull(finalCurrent); + Assertions.assertTrue(resultFilter.expectedFpp() <= finalCurrent.fpp()); rs.close(); }); } @@ -290,7 +294,7 @@ public void testPatternSavedToDatabase() { @Test public void testEquals() { List tokens = new ArrayList<>(Collections.singletonList("one")); - Row row = generatedRow(sizeMap, tokens); + Row row = generatedRow(filterFields, tokens); String partition = row.getString(0); byte[] filterBytes = (byte[]) row.get(1); BloomFilter rawFilter = Assertions @@ -305,8 +309,8 @@ public void testEquals() { public void testNotEqualsTokens() { List tokens1 = new ArrayList<>(Collections.singletonList("one")); List tokens2 = new ArrayList<>(Collections.singletonList("two")); - Row row1 = generatedRow(sizeMap, tokens1); - Row row2 = generatedRow(sizeMap, tokens2); + Row row1 = generatedRow(filterFields, tokens1); + Row row2 = generatedRow(filterFields, tokens2); BloomFilter rawFilter1 = Assertions .assertDoesNotThrow(() -> BloomFilter.readFrom(new ByteArrayInputStream((byte[]) row1.get(1)))); BloomFilter rawFilter2 = Assertions @@ -328,14 +332,15 @@ public void testNotEqualsTokens() { // -- Helper methods -- - private Row generatedRow(SortedMap filterMap, List tokens) { - long size = filterMap.lastKey(); - for (long key : filterMap.keySet()) { - if (key < size && key >= tokens.size()) { - size = key; + private Row generatedRow(List fieldList, List tokens) { + FilterField current = fieldList.get(fieldList.size() - 1); + for (FilterField field : fieldList) { + Long expected = field.expected(); + if (expected < current.expected() && expected >= tokens.size()) { + current = field; } } - BloomFilter bf = BloomFilter.create(size, filterMap.get(size)); + BloomFilter bf = BloomFilter.create(current.expectedIntValue(), current.fpp()); tokens.forEach(bf::put); ByteArrayOutputStream baos = new ByteArrayOutputStream(); Assertions.assertDoesNotThrow(() -> { From 306f20d2b28348aa47318a78aca606c5bcb57328 Mon Sep 17 00:00:00 2001 From: elliVM <47@teragrep.com> Date: Wed, 9 Oct 2024 16:08:26 +0300 Subject: [PATCH 2/7] add FilterFieldTest --- .../teragrep/bloomfilter/FilterFieldTest.java | 87 +++++++++++++++++++ 1 file changed, 87 insertions(+) create mode 100644 src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterFieldTest.java diff --git a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterFieldTest.java b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterFieldTest.java new file mode 100644 index 0000000..2984e3f --- /dev/null +++ b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterFieldTest.java @@ -0,0 +1,87 @@ +/* + * Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10) + * Copyright (C) 2019-2024 Suomen Kanuuna Oy + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * + * Additional permission under GNU Affero General Public License version 3 + * section 7 + * + * If you modify this Program, or any covered work, by linking or combining it + * with other code, such other code is not for that reason alone subject to any + * of the requirements of the GNU Affero GPL version 3 as long as this Program + * is the same Program as licensed from Suomen Kanuuna Oy without any additional + * modifications. + * + * Supplemented terms under GNU Affero General Public License version 3 + * section 7 + * + * Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified + * versions must be marked as "Modified version of" The Program. + * + * Names of the licensors and authors may not be used for publicity purposes. + * + * No rights are granted for use of trade names, trademarks, or service marks + * which are in The Program if any. + * + * Licensee must indemnify licensors and authors for any liability that these + * contractual assumptions impose on licensors and authors. + * + * To the extent this program is licensed as part of the Commercial versions of + * Teragrep, the applicable Commercial License may apply to this file if you as + * a licensee so wish it. + */ +package com.teragrep.pth10.steps.teragrep.bloomfilter; + +import org.apache.spark.util.sketch.BloomFilter; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class FilterFieldTest { + + @Test + public void testBitSize() { + long expected = BloomFilter.create(1000, 0.01).bitSize(); + long result = new FilterField(1000, 0.01).bitSize(); + Assertions.assertEquals(expected, result); + } + + @Test + public void testEquality() { + FilterField field1 = new FilterField(10, 0.01); + FilterField field2 = new FilterField(10, 0.01); + Assertions.assertEquals(field1, field2); + } + + @Test + public void testNonEquals() { + FilterField field1 = new FilterField(10, 0.01); + FilterField field2 = new FilterField(100, 0.01); + FilterField field3 = new FilterField(10, 0.02); + Assertions.assertNotEquals(field1, field2); + Assertions.assertNotEquals(field1, field3); + } + + @Test + public void testHashCode() { + FilterField field1 = new FilterField(10, 0.01); + FilterField field2 = new FilterField(10, 0.01); + FilterField field3 = new FilterField(11, 0.01); + FilterField field4 = new FilterField(10, 0.02); + Assertions.assertEquals(field1.hashCode(), field2.hashCode()); + Assertions.assertNotEquals(field1.hashCode(), field3.hashCode()); + Assertions.assertNotEquals(field1.hashCode(), field4.hashCode()); + } +} From 27c1ff22e792e86a4f2ff4aacd3c1763ec2586b9 Mon Sep 17 00:00:00 2001 From: elliVM <47@teragrep.com> Date: Wed, 9 Oct 2024 16:16:02 +0300 Subject: [PATCH 3/7] test also expected values in testFieldListMethod --- .../pth10/steps/teragrep/bloomfilter/FilterTypesTest.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java index 3099fb7..952d7c4 100644 --- a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java +++ b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java @@ -71,6 +71,9 @@ public void testFieldListMethod() { Config config = ConfigFactory.parseProperties(properties); FilterTypes filterTypes = new FilterTypes(config); List fieldList = filterTypes.fieldList(); + assertEquals(1000, fieldList.get(0).expected()); + assertEquals(2000, fieldList.get(1).expected()); + assertEquals(3000, fieldList.get(2).expected()); assertEquals(0.01, fieldList.get(0).fpp()); assertEquals(0.01, fieldList.get(1).fpp()); assertEquals(0.01, fieldList.get(2).fpp()); From 52344598a3b481cd70654a5ad50b32e2383cd418 Mon Sep 17 00:00:00 2001 From: elliVM <47@teragrep.com> Date: Thu, 10 Oct 2024 07:52:40 +0300 Subject: [PATCH 4/7] throw exception is duplicate FilterField --- .../pth10/steps/teragrep/bloomfilter/FilterTypes.java | 6 ++++++ .../pth10/steps/teragrep/bloomfilter/FilterTypesTest.java | 6 +++--- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java index d6505d3..4425ce5 100644 --- a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java @@ -53,6 +53,7 @@ import java.io.Serializable; import java.util.*; +import java.util.stream.Collectors; public final class FilterTypes implements Serializable { @@ -84,11 +85,16 @@ public List fieldList() { + e.getMessage() ); } + final boolean hasDuplicates = new HashSet<>(fieldList).size() != fieldList.size(); + if (hasDuplicates) { + throw new RuntimeException("Found duplicate values in 'dpl.pth_06.bloom.db.fields'"); + } return fieldList; } public Map bitSizeMap() { final List fieldList = fieldList(); + System.out.println(fieldList); final Map bitsizeToExpectedItemsMap = new HashMap<>(); // Calculate bitSizes for (final FilterField field : fieldList) { diff --git a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java index 952d7c4..0b0b332 100644 --- a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java +++ b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypesTest.java @@ -103,13 +103,13 @@ public void testDuplicateValues() { .put( "dpl.pth_06.bloom.db.fields", "[" + "{expected: 1000, fpp: 0.01}," + "{expected: 1000, fpp: 0.01}," - + "{expected: 3000, fpp: 0.01}" + + "{expected: 3000, fpp: 0.01}]" ); Config config = ConfigFactory.parseProperties(properties); FilterTypes filterTypes = new FilterTypes(config); RuntimeException exception = assertThrows(RuntimeException.class, filterTypes::fieldList); - String expectedError = "Error reading 'dpl.pth_06.bloom.db.fields' option to JSON, check that option is formated as JSON array and that there are no duplicate values: "; - Assertions.assertTrue(exception.getMessage().contains(expectedError)); + String expectedError = "Found duplicate values in 'dpl.pth_06.bloom.db.fields'"; + Assertions.assertEquals(expectedError, exception.getMessage()); } @Test From 91649b8b5835526a2639fd03f015d8527cfe0d9b Mon Sep 17 00:00:00 2001 From: elliVM <47@teragrep.com> Date: Thu, 10 Oct 2024 12:27:34 +0300 Subject: [PATCH 5/7] create FilterTypes and LazyConnect in TeragrepBloomStep constructor, catch correct exception types from gson.fromJson(), FilterField class final --- .../steps/teragrep/TeragrepBloomStep.java | 32 ++++++++++++++++--- .../teragrep/bloomfilter/FilterField.java | 2 +- .../teragrep/bloomfilter/FilterTypes.java | 6 ++-- 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java index ae85e01..176107d 100644 --- a/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java @@ -76,6 +76,8 @@ public enum BloomMode { private final String inputCol; private final String outputCol; private final String estimateCol; + private final FilterTypes filterTypes; + private final LazyConnection connection; public TeragrepBloomStep( Config zeppelinConfig, @@ -83,12 +85,34 @@ public TeragrepBloomStep( String inputCol, String outputCol, String estimateCol + ) { + this( + zeppelinConfig, + mode, + inputCol, + outputCol, + estimateCol, + new FilterTypes(zeppelinConfig), + new LazyConnection(zeppelinConfig) + ); + } + + public TeragrepBloomStep( + Config zeppelinConfig, + BloomMode mode, + String inputCol, + String outputCol, + String estimateCol, + FilterTypes filterTypes, + LazyConnection connection ) { this.zeppelinConfig = zeppelinConfig; this.mode = mode; this.inputCol = inputCol; this.outputCol = outputCol; this.estimateCol = estimateCol; + this.connection = connection; + this.filterTypes = filterTypes; if (mode == BloomMode.ESTIMATE || mode == BloomMode.AGGREGATE) { // estimate is run as an aggregation @@ -158,7 +182,6 @@ private Dataset estimateSize(Dataset dataset) { } public Dataset aggregate(Dataset dataset) { - final FilterTypes filterTypes = new FilterTypes(this.zeppelinConfig); final SortedMap map = new TreeMap<>(); final List fieldList = filterTypes.fieldList(); for (final FilterField field : fieldList) { @@ -169,10 +192,9 @@ public Dataset aggregate(Dataset dataset) { } private void writeFilterTypes(final Config config) { - final FilterTypes filterTypes = new FilterTypes(config); - final Connection connection = new LazyConnection(config).get(); final List fieldList = filterTypes.fieldList(); final String pattern = filterTypes.pattern(); + final Connection conn = connection.get(); for (final FilterField field : fieldList) { final int expectedInt = field.expectedIntValue(); final double fpp = field.fpp(); @@ -184,13 +206,13 @@ private void writeFilterTypes(final Config config) { ); } final String sql = "INSERT IGNORE INTO `filtertype` (`expectedElements`, `targetFpp`, `pattern`) VALUES (?, ?, ?)"; - try (final PreparedStatement stmt = connection.prepareStatement(sql)) { + try (final PreparedStatement stmt = conn.prepareStatement(sql)) { stmt.setInt(1, expectedInt); // filtertype.expectedElements stmt.setDouble(2, fpp); // filtertype.targetFpp stmt.setString(3, pattern); // filtertype.pattern stmt.executeUpdate(); stmt.clearParameters(); - connection.commit(); + conn.commit(); } catch (SQLException e) { if (LOGGER.isErrorEnabled()) { diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterField.java b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterField.java index 59d7af3..253a9b0 100644 --- a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterField.java +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterField.java @@ -49,7 +49,7 @@ import java.util.Objects; -public class FilterField { +public final class FilterField { private final Long expected; private final Double fpp; diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java index 4425ce5..4bb7a1f 100644 --- a/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/bloomfilter/FilterTypes.java @@ -46,6 +46,8 @@ package com.teragrep.pth10.steps.teragrep.bloomfilter; import com.google.gson.Gson; +import com.google.gson.JsonIOException; +import com.google.gson.JsonSyntaxException; import com.typesafe.config.Config; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,7 +55,6 @@ import java.io.Serializable; import java.util.*; -import java.util.stream.Collectors; public final class FilterTypes implements Serializable { @@ -79,7 +80,7 @@ public List fieldList() { fieldList = gson.fromJson(sizesJsonString(), new TypeToken>() { }.getType()); } - catch (Exception e) { + catch (JsonIOException | JsonSyntaxException e) { throw new RuntimeException( "Error reading 'dpl.pth_06.bloom.db.fields' option to JSON, check that option is formated as JSON array and that there are no duplicate values: " + e.getMessage() @@ -94,7 +95,6 @@ public List fieldList() { public Map bitSizeMap() { final List fieldList = fieldList(); - System.out.println(fieldList); final Map bitsizeToExpectedItemsMap = new HashMap<>(); // Calculate bitSizes for (final FilterField field : fieldList) { From 6ea0211ed334e3c9b94e33b3da683d181d9c4bd9 Mon Sep 17 00:00:00 2001 From: elliVM <47@teragrep.com> Date: Mon, 14 Oct 2024 10:14:10 +0300 Subject: [PATCH 6/7] add create table if not exists statement for filtertype --- .../steps/teragrep/TeragrepBloomStep.java | 25 ++++++++++++++++--- 1 file changed, 22 insertions(+), 3 deletions(-) diff --git a/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java index 176107d..967a958 100644 --- a/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java +++ b/src/main/java/com/teragrep/pth10/steps/teragrep/TeragrepBloomStep.java @@ -153,7 +153,8 @@ public Dataset get(Dataset dataset) { * @return Dataset unmodified */ private Dataset createBloomFilter(Dataset dataset) { - writeFilterTypes(this.zeppelinConfig); + createFilterTypeTable(); + writeFilterTypes(); final BloomFilterTable table = new BloomFilterTable(zeppelinConfig); table.create(); dataset.foreachPartition(new BloomFilterForeachPartitionFunction(this.zeppelinConfig)); @@ -167,7 +168,8 @@ private Dataset createBloomFilter(Dataset dataset) { * @return Dataset unmodified */ private Dataset updateBloomFilter(Dataset dataset) { - writeFilterTypes(this.zeppelinConfig); + createFilterTypeTable(); + writeFilterTypes(); final BloomFilterTable table = new BloomFilterTable(zeppelinConfig); table.create(); dataset.foreachPartition(new BloomFilterForeachPartitionFunction(this.zeppelinConfig, true)); @@ -191,7 +193,7 @@ public Dataset aggregate(Dataset dataset) { return dataset.groupBy("partition").agg(agg.toColumn().as("bloomfilter")); } - private void writeFilterTypes(final Config config) { + private void writeFilterTypes() { final List fieldList = filterTypes.fieldList(); final String pattern = filterTypes.pattern(); final Connection conn = connection.get(); @@ -226,4 +228,21 @@ private void writeFilterTypes(final Config config) { } } } + + private void createFilterTypeTable() { + // from pth-06/database/bloomdb + final String sql = "CREATE TABLE IF NOT EXISTS `filtertype` (" + + " `id` bigint(20) UNSIGNED NOT NULL AUTO_INCREMENT PRIMARY KEY," + + " `expectedElements` bigint(20) UNSIGNED NOT NULL," + + " `targetFpp` DOUBLE(2, 2) UNSIGNED NOT NULL," + + " `pattern` VARCHAR(2048) NOT NULL," + + " UNIQUE KEY (`expectedElements`, `targetFpp`, `pattern`)" + ") ENGINE = InnoDB" + + " DEFAULT CHARSET = utf8mb4" + " COLLATE = utf8mb4_unicode_ci;"; + try (final PreparedStatement statement = connection.get().prepareStatement(sql)) { + statement.execute(); + } + catch (SQLException e) { + throw new RuntimeException("Error creating `filtertype` table: " + e.getMessage()); + } + } } From 57eca2f6b8bd3b0a6c823d04308152da0fcb92c0 Mon Sep 17 00:00:00 2001 From: elliVM <47@teragrep.com> Date: Mon, 14 Oct 2024 10:14:14 +0300 Subject: [PATCH 7/7] use non-persistent in-memory db for h2 testing --- .../pth10/steps/teragrep/bloomfilter/BloomFilterTableTest.java | 2 +- .../steps/teragrep/bloomfilter/TeragrepBloomFilterTest.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/BloomFilterTableTest.java b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/BloomFilterTableTest.java index fe06e67..c9e3160 100644 --- a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/BloomFilterTableTest.java +++ b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/BloomFilterTableTest.java @@ -60,7 +60,7 @@ class BloomFilterTableTest { final String username = "sa"; final String password = ""; - final String connectionUrl = "jdbc:h2:~/test;MODE=MariaDB;DATABASE_TO_LOWER=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE"; + final String connectionUrl = "jdbc:h2:mem:test;MODE=MariaDB;DATABASE_TO_LOWER=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE"; @BeforeAll void setEnv() { diff --git a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilterTest.java b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilterTest.java index dc71080..6f84ae2 100644 --- a/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilterTest.java +++ b/src/test/java/com/teragrep/pth10/steps/teragrep/bloomfilter/TeragrepBloomFilterTest.java @@ -76,7 +76,7 @@ void setEnv() { properties.put("dpl.pth_10.bloom.db.username", username); String password = ""; properties.put("dpl.pth_10.bloom.db.password", password); - String connectionUrl = "jdbc:h2:~/test;MODE=MariaDB;DATABASE_TO_LOWER=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE"; + String connectionUrl = "jdbc:h2:mem:test;MODE=MariaDB;DATABASE_TO_LOWER=TRUE;CASE_INSENSITIVE_IDENTIFIERS=TRUE"; properties.put("dpl.pth_06.bloom.db.url", connectionUrl); properties .put(