Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add exception handling for bloom option json reading #370

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,7 @@

import com.teragrep.functions.dpf_03.BloomFilterAggregator;
import com.teragrep.pth10.steps.AbstractStep;
import com.teragrep.pth10.steps.teragrep.bloomfilter.BloomFilterForeachPartitionFunction;
import com.teragrep.pth10.steps.teragrep.bloomfilter.BloomFilterTable;
import com.teragrep.pth10.steps.teragrep.bloomfilter.FilterTypes;
import com.teragrep.pth10.steps.teragrep.bloomfilter.LazyConnection;
import com.teragrep.pth10.steps.teragrep.bloomfilter.*;
import com.typesafe.config.Config;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Expand All @@ -61,8 +58,7 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.Map;
import java.util.SortedMap;
import java.util.*;

/**
* teragrep exec bloom
Expand All @@ -80,19 +76,43 @@ public enum BloomMode {
private final String inputCol;
private final String outputCol;
private final String estimateCol;
private final FilterTypes filterTypes;
private final LazyConnection connection;

public TeragrepBloomStep(
Config zeppelinConfig,
BloomMode mode,
String inputCol,
String outputCol,
String estimateCol
) {
this(
zeppelinConfig,
mode,
inputCol,
outputCol,
estimateCol,
new FilterTypes(zeppelinConfig),
new LazyConnection(zeppelinConfig)
);
}

public TeragrepBloomStep(
Config zeppelinConfig,
BloomMode mode,
String inputCol,
String outputCol,
String estimateCol,
FilterTypes filterTypes,
LazyConnection connection
) {
this.zeppelinConfig = zeppelinConfig;
this.mode = mode;
this.inputCol = inputCol;
this.outputCol = outputCol;
this.estimateCol = estimateCol;
this.connection = connection;
this.filterTypes = filterTypes;

if (mode == BloomMode.ESTIMATE || mode == BloomMode.AGGREGATE) {
// estimate is run as an aggregation
Expand Down Expand Up @@ -162,43 +182,44 @@ private Dataset<Row> estimateSize(Dataset<Row> dataset) {
}

public Dataset<Row> aggregate(Dataset<Row> dataset) {

FilterTypes filterTypes = new FilterTypes(this.zeppelinConfig);

BloomFilterAggregator agg = new BloomFilterAggregator(inputCol, estimateCol, filterTypes.sortedMap());

final SortedMap<Long, Double> map = new TreeMap<>();
final List<FilterField> fieldList = filterTypes.fieldList();
for (final FilterField field : fieldList) {
map.put(field.expected(), field.fpp());
}
final BloomFilterAggregator agg = new BloomFilterAggregator(inputCol, estimateCol, map);
51-code marked this conversation as resolved.
Show resolved Hide resolved
return dataset.groupBy("partition").agg(agg.toColumn().as("bloomfilter"));

}

private void writeFilterTypes(final Config config) {
final FilterTypes filterTypes = new FilterTypes(config);
final Connection connection = new LazyConnection(config).get();
final SortedMap<Long, Double> filterSizeMap = filterTypes.sortedMap();
final List<FilterField> fieldList = filterTypes.fieldList();
final String pattern = filterTypes.pattern();
for (final Map.Entry<Long, Double> entry : filterSizeMap.entrySet()) {
final Connection conn = connection.get();
for (final FilterField field : fieldList) {
final int expectedInt = field.expectedIntValue();
final double fpp = field.fpp();
if (LOGGER.isInfoEnabled()) {
LOGGER
.info(
"Writing filtertype (expected <[{}]>, fpp: <[{}]>, pattern: <[{}]>)", entry.getKey(),
entry.getValue(), pattern
"Writing filtertype (expected <[{}]>, fpp: <[{}]>, pattern: <[{}]>)", expectedInt, fpp,
pattern
);
}
final String sql = "INSERT IGNORE INTO `filtertype` (`expectedElements`, `targetFpp`, `pattern`) VALUES (?, ?, ?)";
try (final PreparedStatement stmt = connection.prepareStatement(sql)) {
stmt.setInt(1, entry.getKey().intValue()); // filtertype.expectedElements
stmt.setDouble(2, entry.getValue()); // filtertype.targetFpp
try (final PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setInt(1, expectedInt); // filtertype.expectedElements
stmt.setDouble(2, fpp); // filtertype.targetFpp
stmt.setString(3, pattern); // filtertype.pattern
stmt.executeUpdate();
stmt.clearParameters();
connection.commit();
conn.commit();
}
catch (SQLException e) {
if (LOGGER.isErrorEnabled()) {
LOGGER
.error(
"Error writing filter[expected: <{}>, fpp: <{}>, pattern: <{}>] into database",
entry.getKey(), entry.getValue(), pattern
expectedInt, fpp, pattern
);
}
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Teragrep Data Processing Language (DPL) translator for Apache Spark (pth_10)
* Copyright (C) 2019-2024 Suomen Kanuuna Oy
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*
*
* Additional permission under GNU Affero General Public License version 3
* section 7
*
* If you modify this Program, or any covered work, by linking or combining it
* with other code, such other code is not for that reason alone subject to any
* of the requirements of the GNU Affero GPL version 3 as long as this Program
* is the same Program as licensed from Suomen Kanuuna Oy without any additional
* modifications.
*
* Supplemented terms under GNU Affero General Public License version 3
* section 7
*
* Origin of the software must be attributed to Suomen Kanuuna Oy. Any modified
* versions must be marked as "Modified version of" The Program.
*
* Names of the licensors and authors may not be used for publicity purposes.
*
* No rights are granted for use of trade names, trademarks, or service marks
* which are in The Program if any.
*
* Licensee must indemnify licensors and authors for any liability that these
* contractual assumptions impose on licensors and authors.
*
* To the extent this program is licensed as part of the Commercial versions of
* Teragrep, the applicable Commercial License may apply to this file if you as
* a licensee so wish it.
*/
package com.teragrep.pth10.steps.teragrep.bloomfilter;

import org.apache.spark.util.sketch.BloomFilter;

import java.util.Objects;

public final class FilterField {

private final Long expected;
private final Double fpp;

public FilterField(long expected, double fpp) {
this.expected = expected;
this.fpp = fpp;
}

public Long expected() {
return expected;
}

public int expectedIntValue() {
return expected.intValue();
}

public Double fpp() {
return fpp;
}

public long bitSize() {
return BloomFilter.create(expected, fpp).bitSize();
}

@Override
public boolean equals(Object object) {
if (this == object)
return true;
if (object == null)
return false;
if (object.getClass() != this.getClass())
return false;
final FilterField cast = (FilterField) object;
return this.expected.equals(cast.expected) && this.fpp.equals(cast.fpp);
}

@Override
public int hashCode() {
return Objects.hash(expected, fpp);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@
package com.teragrep.pth10.steps.teragrep.bloomfilter;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonIOException;
import com.google.gson.JsonSyntaxException;
import com.typesafe.config.Config;
import org.apache.spark.util.sketch.BloomFilter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.sparkproject.guava.reflect.TypeToken;
Expand All @@ -73,35 +73,32 @@ public FilterTypes(Config config) {
*
* @return SortedMap of filter configuration
*/
public SortedMap<Long, Double> sortedMap() {
final SortedMap<Long, Double> sizesMapFromJson = new TreeMap<>();
final Gson gson = new Gson();
final List<JsonObject> jsonArray = gson.fromJson(sizesJsonString(), new TypeToken<List<JsonObject>>() {
}.getType());
for (final JsonObject object : jsonArray) {
if (object.has("expected") && object.has("fpp")) {
final Long expectedNumOfItems = Long.parseLong(object.get("expected").toString());
final Double fpp = Double.parseDouble(object.get("fpp").toString());
if (sizesMapFromJson.containsKey(expectedNumOfItems)) {
LOGGER.error("Duplicate value of expected number of items value: <[{}]>", expectedNumOfItems);
throw new RuntimeException("Duplicate entry expected num of items");
}
sizesMapFromJson.put(expectedNumOfItems, fpp);
}
else {
throw new RuntimeException("JSON did not have expected values of 'expected' or 'fpp'");
}
public List<FilterField> fieldList() {
final List<FilterField> fieldList;
try {
final Gson gson = new Gson();
fieldList = gson.fromJson(sizesJsonString(), new TypeToken<List<FilterField>>() {
}.getType());
}
catch (JsonIOException | JsonSyntaxException e) {
throw new RuntimeException(
"Error reading 'dpl.pth_06.bloom.db.fields' option to JSON, check that option is formated as JSON array and that there are no duplicate values: "
+ e.getMessage()
);
}
final boolean hasDuplicates = new HashSet<>(fieldList).size() != fieldList.size();
if (hasDuplicates) {
throw new RuntimeException("Found duplicate values in 'dpl.pth_06.bloom.db.fields'");
}
return sizesMapFromJson;
return fieldList;
}

public Map<Long, Long> bitSizeMap() {
final Map<Long, Double> filterSizes = sortedMap();
final List<FilterField> fieldList = fieldList();
final Map<Long, Long> bitsizeToExpectedItemsMap = new HashMap<>();
// Calculate bitSizes
for (final Map.Entry<Long, Double> entry : filterSizes.entrySet()) {
final BloomFilter bf = BloomFilter.create(entry.getKey(), entry.getValue());
bitsizeToExpectedItemsMap.put(bf.bitSize(), entry.getKey());
for (final FilterField field : fieldList) {
bitsizeToExpectedItemsMap.put(field.bitSize(), field.expected());
}
return bitsizeToExpectedItemsMap;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,10 @@
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -91,9 +93,17 @@ public void saveFilter(final Boolean overwrite) {
final Double selectedFpp;
final Map<Long, Long> bitSizeMap = filterTypes.bitSizeMap();
if (bitSizeMap.containsKey(bitSize)) {
final long expectedItems = bitSizeMap.get(bitSize);
selectedExpectedNumOfItems = expectedItems;
selectedFpp = filterTypes.sortedMap().get(expectedItems);
selectedExpectedNumOfItems = bitSizeMap.get(bitSize);
List<Double> fppList = filterTypes
.fieldList()
.stream()
.filter(f -> f.expected().equals(selectedExpectedNumOfItems))
.map(FilterField::fpp)
.collect(Collectors.toList());
if (fppList.size() != 1) {
throw new RuntimeException("Could not find fpp value for bit size: <[" + bitSize + "]>");
}
selectedFpp = fppList.get(0);
}
else {
throw new IllegalArgumentException("no such filterSize <[" + bitSize + "]>");
Expand Down
Loading