Skip to content

Commit

Permalink
Merge pull request #1724 from AnuGayan/up_master
Browse files Browse the repository at this point in the history
Improve IncrementalDataPurger logic and Persisted Aggregation
  • Loading branch information
CharukaK authored Apr 23, 2021
2 parents ab58ff0 + 937451d commit 0bf06c1
Show file tree
Hide file tree
Showing 6 changed files with 68 additions and 24 deletions.
5 changes: 3 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,11 @@
# limitations under the License.

language: java

jdk:
- openjdk8
after_success:
- bash <(curl -s https://codecov.io/bash)

script: mvn clean install -q -B -V | grep -v DEBUG; exit "${PIPESTATUS[0]}";
cache:
directories:
- $HOME/.m2
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import io.siddhi.query.api.definition.TableDefinition;
import io.siddhi.query.api.execution.query.OnDemandQuery;
import io.siddhi.query.api.execution.query.input.store.InputStore;
import io.siddhi.query.api.execution.query.selection.OrderByAttribute;
import io.siddhi.query.api.execution.query.selection.OutputAttribute;
import io.siddhi.query.api.execution.query.selection.Selector;
import io.siddhi.query.api.expression.Expression;
Expand Down Expand Up @@ -408,6 +409,7 @@ private OnDemandQuery getOnDemandQuery(Table table, long timeFrom, long timeTo)
outputAttributes.add(new OutputAttribute(new Variable(AGG_START_TIMESTAMP_COL)));
Selector selector = Selector.selector().addSelectionList(outputAttributes)
.groupBy(Expression.variable(AGG_START_TIMESTAMP_COL))
.orderBy(Expression.variable(AGG_START_TIMESTAMP_COL), OrderByAttribute.Order.DESC)
.limit(Expression.value(1));
InputStore inputStore;
if (timeTo != 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void process(ComplexEventChunk complexEventChunk) {
if (outputData.length == 3) {
Date fromTime = new Date((Long) outputData[0]);
Date toTime = new Date((Long) outputData[1]);
log.info("Aggregation executed for duration " + duration + " from " + fromTime + " to " +
log.debug("Aggregation executed for duration " + duration + " from " + fromTime + " to " +
toTime + " and " + outputData[2] + " records has been successfully updated ");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,8 +162,10 @@ public final class SiddhiConstants {
public static final String PLACEHOLDER_QUERY = "{{QUERY}}";
public static final String PLACEHOLDER_SELECTORS = "{{SELECTORS}}";
public static final String PLACEHOLDER_CONDITION = "{{CONDITION}}";
public static final String PLACEHOLDER_INNER_QUERY = "{{INNER_QUERY}}";
public static final String PLACEHOLDER_INNER_QUERY_1 = "{{INNER_QUERY_1}}";
public static final String PLACEHOLDER_INNER_QUERY_2 = "{{INNER_QUERY_2}}";
public static final String PLACEHOLDER_FROM_CONDITION = "{{FROM_CONDITION}}";
public static final String PLACEHOLDER_ON_CONDITION = "{{ON_CONDITION}}";

public static final String INSERT_TO_TABLE_NAME = "TO_TABLE_NAME";
public static final String FROM_TABLE_NAME = "FROM_TABLE_NAME";
Expand All @@ -172,12 +174,14 @@ public final class SiddhiConstants {
public static final String TO_TIMESTAMP = "TO_TIMESTAMP";
public static final String SUB_SELECT_QUERY_REF_T2 = "t2";
public static final String SUB_SELECT_QUERY_REF_T1 = "t1";
public static final String INNER_SELECT_QUERY_REF_T3 = "t3";
public static final String EQUALS = " = ";
public static final String SQL_AS = " AS ";
public static final String SQL_AND = " AND ";
public static final String SQL_ON = " ON ";
public static final String SQL_SELECT = " SELECT ";
public static final String SQL_WHERE = " WHERE ";
public static final String SQL_FROM = " FROM ";
public static final String SQL_NOT_NULL = " IS NOT NULL ";

}
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@
import static io.siddhi.core.util.SiddhiConstants.EQUALS;
import static io.siddhi.core.util.SiddhiConstants.FROM_TIMESTAMP;
import static io.siddhi.core.util.SiddhiConstants.FUNCTION_NAME_CUD;
import static io.siddhi.core.util.SiddhiConstants.INNER_SELECT_QUERY_REF_T3;
import static io.siddhi.core.util.SiddhiConstants.METRIC_INFIX_AGGREGATIONS;
import static io.siddhi.core.util.SiddhiConstants.METRIC_TYPE_FIND;
import static io.siddhi.core.util.SiddhiConstants.METRIC_TYPE_INSERT;
Expand All @@ -125,12 +126,15 @@
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_CONDITION;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_DURATION;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_FROM_CONDITION;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_INNER_QUERY;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_INNER_QUERY_1;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_INNER_QUERY_2;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_ON_CONDITION;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_SELECTORS;
import static io.siddhi.core.util.SiddhiConstants.PLACEHOLDER_TABLE_NAME;
import static io.siddhi.core.util.SiddhiConstants.SQL_AND;
import static io.siddhi.core.util.SiddhiConstants.SQL_AS;
import static io.siddhi.core.util.SiddhiConstants.SQL_FROM;
import static io.siddhi.core.util.SiddhiConstants.SQL_NOT_NULL;
import static io.siddhi.core.util.SiddhiConstants.SQL_SELECT;
import static io.siddhi.core.util.SiddhiConstants.SQL_WHERE;
import static io.siddhi.core.util.SiddhiConstants.SUB_SELECT_QUERY_REF_T1;
Expand Down Expand Up @@ -1193,6 +1197,7 @@ private static String generateDatabaseQuery(List<ExpressionExecutor> expressionE
StringJoiner innerSelectT2ColumnJoiner = new StringJoiner(", ", SQL_SELECT, " ");

StringJoiner onConditionBuilder = new StringJoiner(SQL_AND);
StringJoiner subSelectT2OnConditionBuilder = new StringJoiner(SQL_AND);

StringJoiner groupByQueryBuilder = new StringJoiner(", ");
StringJoiner finalSelectQuery = new StringJoiner(" ");
Expand All @@ -1204,6 +1209,8 @@ private static String generateDatabaseQuery(List<ExpressionExecutor> expressionE
String innerFromClause = SQL_FROM + parentAggregationTable.getTableDefinition().getId();
String innerWhereFilterClause;
String groupByClause;
String innerT2WhereCondition;


StringJoiner innerT2Query = new StringJoiner(" ");
StringJoiner subQueryT1 = new StringJoiner(" ");
Expand Down Expand Up @@ -1232,8 +1239,14 @@ private static String generateDatabaseQuery(List<ExpressionExecutor> expressionE
groupByQueryBuilder.add(variable.getAttributeName());
onConditionBuilder.add(SUB_SELECT_QUERY_REF_T1 + "." + variable.getAttributeName() +
EQUALS + SUB_SELECT_QUERY_REF_T2 + "." + variable.getAttributeName());
subSelectT2OnConditionBuilder.add(parentAggregationTable.getTableDefinition().getId() + "." +
variable.getAttributeName() + EQUALS + INNER_SELECT_QUERY_REF_T3 + "." +
variable.getAttributeName());
});

innerT2WhereCondition = INNER_SELECT_QUERY_REF_T3 + "." + groupByVariableList.get(0).getAttributeName() +
SQL_NOT_NULL;

for (ExpressionExecutor expressionExecutor : expressionExecutors) {

if (expressionExecutor instanceof VariableExpressionExecutor) {
Expand All @@ -1244,11 +1257,16 @@ private static String generateDatabaseQuery(List<ExpressionExecutor> expressionE
} else if (!variableExpressionExecutor.getAttribute().getName().equals(AGG_EXTERNAL_TIMESTAMP_COL)) {
subSelectT2ColumnJoiner.add(variableExpressionExecutor.getAttribute().getName());
if (groupByColumnNames.contains(variableExpressionExecutor.getAttribute().getName())) {
subSelectT2ColumnJoiner.add(INNER_SELECT_QUERY_REF_T3 + "." +
variableExpressionExecutor.getAttribute().getName() + SQL_AS +
variableExpressionExecutor.getAttribute().getName()) ;
outerSelectColumnJoiner.add(SUB_SELECT_QUERY_REF_T1 + "." +
variableExpressionExecutor.getAttribute().getName() +
SQL_AS + attributeList.get(i).getName());
subSelectT1ColumnJoiner.add(variableExpressionExecutor.getAttribute().getName());
innerSelectT2ColumnJoiner.add(variableExpressionExecutor.getAttribute().getName());
} else {
subSelectT2ColumnJoiner.add(variableExpressionExecutor.getAttribute().getName());
outerSelectColumnJoiner.add(SUB_SELECT_QUERY_REF_T2 + "." +
variableExpressionExecutor.getAttribute().getName() +
SQL_AS + attributeList.get(i).getName());
Expand All @@ -1274,10 +1292,14 @@ private static String generateDatabaseQuery(List<ExpressionExecutor> expressionE
} else if (expressionExecutor instanceof MaxAttributeAggregatorExecutor) {
if (attributeList.get(i).getName().equals(AGG_LAST_TIMESTAMP_COL)) {
innerSelectT2ColumnJoiner.add(dbAggregationSelectFunctionTemplates.getMaxFunction().
replace(PLACEHOLDER_COLUMN, attributeList.get(i).getName()));
subSelectT2ColumnJoiner.add(attributeList.get(i).getName());
replace(PLACEHOLDER_COLUMN, attributeList.get(i).getName()) + SQL_AS + attributeList.get(i).getName());
subSelectT2ColumnJoiner.add(INNER_SELECT_QUERY_REF_T3 + "." + attributeList.get(i).getName() +
SQL_AS + attributeList.get(i).getName());
outerSelectColumnJoiner.add(SUB_SELECT_QUERY_REF_T2 + "." + attributeList.get(i).getName() +
SQL_AS + attributeList.get(i).getName());
subSelectT2OnConditionBuilder.add(parentAggregationTable.getTableDefinition().getId() + "." +
attributeList.get(i).getName() + EQUALS + INNER_SELECT_QUERY_REF_T3 + "." +
attributeList.get(i).getName());
} else {
outerSelectColumnJoiner.add(SUB_SELECT_QUERY_REF_T1 + "." + attributeList.get(i).getName() + SQL_AS +
attributeList.get(i).getName());
Expand Down Expand Up @@ -1320,13 +1342,15 @@ private static String generateDatabaseQuery(List<ExpressionExecutor> expressionE
subQueryT2.add(dbAggregationSelectQueryTemplate.getSelectQueryWithInnerSelect().
replace(PLACEHOLDER_SELECTORS, subSelectT2ColumnJoiner.toString()).
replace(PLACEHOLDER_TABLE_NAME, parentAggregationTable.getTableDefinition().getId()).
replace(PLACEHOLDER_COLUMN, AGG_LAST_TIMESTAMP_COL).
replace(PLACEHOLDER_INNER_QUERY, innerT2Query.toString()));
replace(PLACEHOLDER_INNER_QUERY_2, innerT2Query.toString()).
replace(PLACEHOLDER_ON_CONDITION, subSelectT2OnConditionBuilder.toString()).
replace(PLACEHOLDER_CONDITION, innerT2WhereCondition));


finalSelectQuery.add(dbAggregationSelectQueryTemplate.getJoinClause().
replace(PLACEHOLDER_SELECTORS, outerSelectColumnJoiner.toString()).
replace(PLACEHOLDER_FROM_CONDITION, subQueryT1.toString()).
replace(PLACEHOLDER_INNER_QUERY, subQueryT2.toString()).
replace(PLACEHOLDER_INNER_QUERY_1, subQueryT2.toString()).
replace(PLACEHOLDER_CONDITION, onConditionBuilder.toString()));

completeQuery.add(insertIntoQueryBuilder.toString()).add(finalSelectQuery.toString());
Expand Down Expand Up @@ -1389,7 +1413,7 @@ private static Database getDatabaseType(ConfigManager configManager, String data
return Database.MYSQL;
} else if (databaseType.contains("oracle")) {
return Database.ORACLE;
} else if (databaseType.contains("mssql")) {
} else if (databaseType.contains("mssql") || databaseType.contains("sqlserver")) {
return Database.MSSQL;
} else if (databaseType.contains("postgres")) {
return Database.PostgreSQL;
Expand Down
39 changes: 26 additions & 13 deletions modules/siddhi-core/src/main/resources/db-aggregation-config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,21 @@
<recordInsertQuery>INSERT INTO {{TABLE_NAME}} ({{COLUMNS}})</recordInsertQuery>
<selectClause>SELECT {{SELECTORS}} FROM {{TABLE_NAME}}</selectClause>
<whereClause>WHERE {{CONDITION}}</whereClause>
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} WHERE {{COLUMN}} IN ({{INNER_QUERY}})
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} LEFT JOIN ({{INNER_QUERY_2}}) AS t3 ON
{{ON_CONDITION}} WHERE {{CONDITION}}
</selectQueryWithInnerSelect>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) AS t1 JOIN ({{INNER_QUERY}}) AS t2 ON {{CONDITION}}</joinClause>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) AS t1 JOIN ({{INNER_QUERY_1}}) AS t2 ON
{{CONDITION}}
</joinClause>
<groupByClause>GROUP BY {{COLUMNS}}</groupByClause>
</selectQueryTemplate>
<selectQueryFunctions>
<sumFunction>SUM({{COLUMN}})</sumFunction>
<countFunction>COUNT({{COLUMN}})</countFunction>
<maxFunction>MAX({{COLUMN}})</maxFunction>
<minFunction>MIN({{COLUMN}})</minFunction>
<timeConversionFunction>UNIX_TIMESTAMP(from_unixtime({{COLUMN}}/1000,"{{DURATION}}"))*1000</timeConversionFunction>
<timeConversionFunction>UNIX_TIMESTAMP(from_unixtime({{COLUMN}}/1000,"{{DURATION}}"))*1000
</timeConversionFunction>
</selectQueryFunctions>
<timeConversionDurationMapping>
<day>%Y-%m-%d</day>
Expand All @@ -41,9 +45,12 @@
<recordInsertQuery>INSERT INTO {{TABLE_NAME}} ({{COLUMNS}})</recordInsertQuery>
<selectClause>SELECT {{SELECTORS}} FROM {{TABLE_NAME}}</selectClause>
<whereClause>WHERE {{CONDITION}}</whereClause>
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} WHERE {{COLUMN}} IN ({{INNER_QUERY}})
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} LEFT JOIN ({{INNER_QUERY_2}}) t3 ON
{{ON_CONDITION}} WHERE {{CONDITION}}
</selectQueryWithInnerSelect>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) t1 JOIN ({{INNER_QUERY}}) t2 ON {{CONDITION}}</joinClause>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) t1 JOIN ({{INNER_QUERY_1}}) t2 ON
{{CONDITION}}
</joinClause>
<groupByClause>GROUP BY {{COLUMNS}}</groupByClause>
</selectQueryTemplate>
<selectQueryFunctions>
Expand Down Expand Up @@ -75,22 +82,25 @@
<bigStringType>CLOB</bigStringType>
</typeMapping>
</database>
<database name="Microsoft SQL Server">
<database name="MSSQL">
<selectQueryTemplate>
<recordInsertQuery>INSERT INTO {{TABLE_NAME}} ({{COLUMNS}})</recordInsertQuery>
<selectClause>SELECT {{SELECTORS}} FROM {{TABLE_NAME}}</selectClause>
<whereClause>WHERE {{CONDITION}}</whereClause>
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} WHERE {{COLUMN}} IN ({{INNER_QUERY}})
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} LEFT JOIN ({{INNER_QUERY_2}}) AS t3 ON
{{ON_CONDITION}} WHERE {{CONDITION}}
</selectQueryWithInnerSelect>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) AS t1 JOIN ({{INNER_QUERY}}) AS t2 ON {{CONDITION}}</joinClause>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) AS t1 JOIN ({{INNER_QUERY_1}}) AS t2 ON
{{CONDITION}}
</joinClause>
<groupByClause>GROUP BY {{COLUMNS}}</groupByClause>
</selectQueryTemplate>
<selectQueryFunctions>
<sumFunction>SUM({{COLUMN}})</sumFunction>
<countFunction>COUNT({{COLUMN}})</countFunction>
<maxFunction>MAX({{COLUMN}})</maxFunction>
<minFunction>MIN({{COLUMN}})</minFunction>
<timeConversionFunction>DATEDIFF(second , '19700101',DATEADD({{DURATION}} , DATEDIFF({{DURATION}} , 0, DATEADD(ss, {{COLUMN}}/1000, '19700101')), 0))*1000</timeConversionFunction>
<timeConversionFunction>CAST(DATEDIFF(second , '19700101',DATEADD({{DURATION}} , DATEDIFF({{DURATION}} , 0, DATEADD(ss, {{COLUMN}}/1000, '19700101')), 0)) AS bigint)*1000</timeConversionFunction>
</selectQueryFunctions>
<timeConversionDurationMapping>
<day>dd</day>
Expand All @@ -116,17 +126,20 @@
<recordInsertQuery>INSERT INTO {{TABLE_NAME}} ({{COLUMNS}})</recordInsertQuery>
<selectClause>SELECT {{SELECTORS}} FROM {{TABLE_NAME}}</selectClause>
<whereClause>WHERE {{CONDITION}}</whereClause>
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} WHERE {{COLUMN}} IN ({{INNER_QUERY}})
<selectQueryWithInnerSelect>SELECT {{SELECTORS}} FROM {{TABLE_NAME}} LEFT JOIN ({{INNER_QUERY_2}}) AS t3 ON
{{ON_CONDITION}} WHERE {{CONDITION}}
</selectQueryWithInnerSelect>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) AS t1 JOIN ({{INNER_QUERY}}) AS t2 ON {{CONDITION}}</joinClause>
<joinClause>SELECT {{SELECTORS}} FROM ({{FROM_CONDITION}}) AS t1 JOIN ({{INNER_QUERY_1}}) AS t2 ON
{{CONDITION}}
</joinClause>
<groupByClause>GROUP BY {{COLUMNS}}</groupByClause>
</selectQueryTemplate>
<selectQueryFunctions>
<sumFunction>SUM({{COLUMN}})</sumFunction>
<countFunction>COUNT({{COLUMN}})</countFunction>
<maxFunction>MAX({{COLUMN}})</maxFunction>
<minFunction>MIN({{COLUMN}})</minFunction>
<timeConversionFunction>EXTRACT(epoch from date_trunc('{{DURATION}}', to_timestamp({{COLUMN}}/1000)))*1000;
<timeConversionFunction>EXTRACT(epoch from date_trunc('{{DURATION}}', to_timestamp({{COLUMN}}/1000)))*1000;
</timeConversionFunction>
</selectQueryFunctions>
<timeConversionDurationMapping>
Expand All @@ -147,4 +160,4 @@
<stringType>VARCHAR</stringType>
</typeMapping>
</database>
</rdbms-table-configuration>
</rdbms-table-configuration>

0 comments on commit 0bf06c1

Please sign in to comment.