Skip to content

Commit

Permalink
Implements a custom task executor that allows failed queries to be sk…
Browse files Browse the repository at this point in the history
…ipped (#145)

* Implements a custom task executor that allows queries to be skipped if exception contains (a) certain string(s).
  • Loading branch information
anjagruenheid authored Oct 16, 2023
1 parent 7e871e0 commit 6e5d861
Show file tree
Hide file tree
Showing 17 changed files with 158 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,17 +115,16 @@ private Map<String, Object> updateRuntimeParameterValues(TaskExec task) {

private TaskExecutor getTaskExecutor(TaskExec task) {
if (task.getCustomTaskExecutor() == null) {
return new TaskExecutor(this.telemetryRegistry, this.experimentStartTime);
return new TaskExecutor(
this.telemetryRegistry, this.experimentStartTime, task.getTaskExecutorArguments());
} else {
try {
Constructor<?> constructor =
Class.forName(task.getCustomTaskExecutor())
.getDeclaredConstructor(SQLTelemetryRegistry.class, String.class, Map.class);
return (TaskExecutor)
constructor.newInstance(
this.telemetryRegistry,
this.experimentStartTime,
task.getCustomTaskExecutorArguments());
this.telemetryRegistry, this.experimentStartTime, task.getTaskExecutorArguments());
} catch (Exception e) {
throw new IllegalArgumentException(
"Unable to load custom task class: " + task.getCustomTaskExecutor(), e);
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/microsoft/lst_bench/exec/TaskExec.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ public interface TaskExec {
@Nullable String getTimeTravelPhaseId();

@Value.Parameter(false)
@Nullable String getCustomTaskExecutor();
@Nullable Map<String, String> getTaskExecutorArguments();

@Value.Parameter(false)
@Nullable Map<String, String> getCustomTaskExecutorArguments();
@Nullable String getCustomTaskExecutor();
}
Original file line number Diff line number Diff line change
Expand Up @@ -221,8 +221,8 @@ private static TaskExec createTaskExec(
taskTemplateIdToParameterValuesCounter);
return ImmutableTaskExec.of(taskId, files)
.withTimeTravelPhaseId(task.getTimeTravelPhaseId())
.withCustomTaskExecutor(task.getCustomTaskExecutor())
.withCustomTaskExecutorArguments(task.getCustomTaskExecutorArguments());
.withTaskExecutorArguments(task.getTaskExecutorArguments())
.withCustomTaskExecutor(task.getCustomTaskExecutor());
}

private static List<FileExec> createFileExecList(
Expand Down
6 changes: 3 additions & 3 deletions src/main/java/com/microsoft/lst_bench/input/Task.java
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,12 @@ public interface Task {
@JsonProperty("time_travel_phase_id")
@Nullable String getTimeTravelPhaseId();

@JsonProperty("task_executor_arguments")
@Nullable Map<String, String> getTaskExecutorArguments();

@JsonProperty("custom_task_executor")
@Nullable String getCustomTaskExecutor();

@JsonProperty("custom_task_executor_arguments")
@Nullable Map<String, String> getCustomTaskExecutorArguments();

@JsonProperty("replace_regex")
@Nullable List<ReplaceRegex> getReplaceRegex();

Expand Down
71 changes: 64 additions & 7 deletions src/main/java/com/microsoft/lst_bench/task/TaskExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,23 +33,59 @@

/**
* Default executor for tasks. Iterates over all files and all the statements contained in those
* files and executes them sequentially.
* files and executes them sequentially. This task executor allows users to avoid failing execution
* by skipping queries which return a specific substring in their error message. For this task
* executor, we allow users to determine the exception strings as part of the input, by specifying
* parameter 'skip_failed_query_task_strings'. If multiple strings can lead to a skip action, they
* need to be separated with delimiter ';' by default.
*/
public class TaskExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(TaskExecutor.class);

private final String SKIP_ERRONEOUS_QUERY_DELIMITER = ";";
private final String SKIP_ERRONEOUS_QUERY_STRINGS_KEY = "skip_erroneous_query_strings";

protected final SQLTelemetryRegistry telemetryRegistry;
protected final String experimentStartTime;
protected final Map<String, String> arguments;

public TaskExecutor(SQLTelemetryRegistry telemetryRegistry, String experimentStartTime) {
public TaskExecutor(
SQLTelemetryRegistry telemetryRegistry,
String experimentStartTime,
Map<String, String> arguments) {
this.experimentStartTime = experimentStartTime;
this.telemetryRegistry = telemetryRegistry;
this.arguments = arguments;
}

protected Map<String, String> getArguments() {
return this.arguments;
}

protected String[] getExceptionStrings() {
// Check whether there are any strings that errors are allowed to contain. In that case, we skip
// the erroneous query and log a warning.
String[] exceptionStrings;
if (this.getArguments() == null
|| this.getArguments().get(SKIP_ERRONEOUS_QUERY_STRINGS_KEY) == null) {
exceptionStrings = new String[] {};
} else {
exceptionStrings =
this.getArguments()
.get(SKIP_ERRONEOUS_QUERY_STRINGS_KEY)
.split(SKIP_ERRONEOUS_QUERY_DELIMITER);
}
return exceptionStrings;
}

public void executeTask(Connection connection, TaskExec task, Map<String, Object> values)
throws ClientException {
String[] exceptionStrings = this.getExceptionStrings();

for (FileExec file : task.getFiles()) {
boolean skip = false;

Instant fileStartTime = Instant.now();
try {
for (StatementExec statement : file.getStatements()) {
Expand All @@ -64,17 +100,38 @@ public void executeTask(Connection connection, TaskExec task, Map<String, Object
+ statement.getStatement()
+ "; error message: "
+ e.getMessage();
LOGGER.error(loggedError);
for (String skipException : exceptionStrings) {
if (e.getMessage().contains(skipException)) {
LOGGER.warn(loggedError);
writeStatementEvent(
statementStartTime, statement.getId(), Status.WARN, /* payload= */ loggedError);

skip = true;
break;
}
}

if (!skip) {
LOGGER.error(loggedError);
writeStatementEvent(
statementStartTime,
statement.getId(),
Status.FAILURE,
/* payload= */ loggedError);

throw e;
}
}
// Only log success if we have not skipped execution.
if (!skip) {
writeStatementEvent(
statementStartTime, statement.getId(), Status.FAILURE, /* payload= */ loggedError);
throw e;
statementStartTime, statement.getId(), Status.SUCCESS, /* payload= */ null);
}
writeStatementEvent(
statementStartTime, statement.getId(), Status.SUCCESS, /* payload= */ null);
}
} catch (Exception e) {
LOGGER.error("Exception executing file: " + file.getId());
writeFileEvent(fileStartTime, file.getId(), Status.FAILURE);

throw e;
}
writeFileEvent(fileStartTime, file.getId(), Status.SUCCESS);
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.microsoft.lst_bench.exec.FileExec;
import com.microsoft.lst_bench.exec.StatementExec;
import com.microsoft.lst_bench.exec.TaskExec;
import com.microsoft.lst_bench.task.TaskExecutor;
import com.microsoft.lst_bench.telemetry.EventInfo.Status;
import com.microsoft.lst_bench.telemetry.SQLTelemetryRegistry;
import com.microsoft.lst_bench.util.StringUtils;
Expand All @@ -36,10 +37,10 @@
* result; and b) a statement repeatedly that is expected to use that result. The result of the
* first statement is stored in a QueryResult object which is then used and interpreted by the
* second statement. For this task executor, we allow the second statement to be executed in
* batches. The batch size can be set via the 'custom_task_executor_arguments' property that is part
* of the workload configuration. The parameter name is 'dependent_task_batch_size'.
* batches. The batch size can be set via the 'task_executor_arguments' property that is part of the
* workload configuration. The parameter name is 'dependent_task_batch_size'.
*/
public class DependentTaskExecutor extends CustomTaskExecutor {
public class DependentTaskExecutor extends TaskExecutor {

private static final Logger LOGGER = LoggerFactory.getLogger(DependentTaskExecutor.class);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ enum EventType {
enum Status {
SUCCESS,
FAILURE,
WARN,
UNKNOWN;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,11 +37,11 @@ phases:
- tasks:
- template_id: data_maintenance_dependent
custom_task_executor: com.microsoft.lst_bench.task.custom.DependentTaskExecutor
custom_task_executor_arguments:
task_executor_arguments:
dependent_task_batch_size: 1000
- template_id: data_maintenance_dependent
custom_task_executor: com.microsoft.lst_bench.task.custom.DependentTaskExecutor
custom_task_executor_arguments:
task_executor_arguments:
dependent_task_batch_size: 1000
- id: single_user_2
sessions:
Expand All @@ -52,11 +52,11 @@ phases:
- tasks:
- template_id: data_maintenance_dependent
custom_task_executor: com.microsoft.lst_bench.task.custom.DependentTaskExecutor
custom_task_executor_arguments:
task_executor_arguments:
dependent_task_batch_size: 1000
- template_id: data_maintenance_dependent
custom_task_executor: com.microsoft.lst_bench.task.custom.DependentTaskExecutor
custom_task_executor_arguments:
task_executor_arguments:
dependent_task_batch_size: 1000
- id: single_user_3
sessions:
Expand All @@ -67,11 +67,11 @@ phases:
- tasks:
- template_id: data_maintenance_dependent
custom_task_executor: com.microsoft.lst_bench.task.custom.DependentTaskExecutor
custom_task_executor_arguments:
task_executor_arguments:
dependent_task_batch_size: 1000
- template_id: data_maintenance_dependent
custom_task_executor: com.microsoft.lst_bench.task.custom.DependentTaskExecutor
custom_task_executor_arguments:
task_executor_arguments:
dependent_task_batch_size: 1000
- id: single_user_4
sessions:
Expand All @@ -82,11 +82,11 @@ phases:
- tasks:
- template_id: data_maintenance_dependent
custom_task_executor: com.microsoft.lst_bench.task.custom.DependentTaskExecutor
custom_task_executor_arguments:
task_executor_arguments:
dependent_task_batch_size: 1000
- template_id: data_maintenance_dependent
custom_task_executor: com.microsoft.lst_bench.task.custom.DependentTaskExecutor
custom_task_executor_arguments:
task_executor_arguments:
dependent_task_batch_size: 1000
- id: single_user_5
sessions:
Expand All @@ -97,11 +97,11 @@ phases:
- tasks:
- template_id: template_id: data_maintenance_dependent
custom_task_executor: com.microsoft.lst_bench.task.custom.DependentTaskExecutor
custom_task_executor_arguments:
task_executor_arguments:
dependent_task_batch_size: 1000
- template_id: data_maintenance_dependent
custom_task_executor: com.microsoft.lst_bench.task.custom.DependentTaskExecutor
custom_task_executor_arguments:
task_executor_arguments:
dependent_task_batch_size: 1000
- id: single_user_6
sessions:
Expand Down
10 changes: 5 additions & 5 deletions src/main/resources/schemas/workload.json
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,16 @@
"title": "Time travel phase identifier",
"description": "If the task template supports time travel, this property can be used to specify the version of the data that will be queried by this task"
},
"task_executor_arguments": {
"type": "object",
"title": "Task executor arguments",
"description": "Any arguments passed to a (custom) task executor need to be defined here."
},
"custom_task_executor": {
"type": "string",
"title": "Identifier for a custom task executor",
"description": "If the task template should be executed with a custom task executor, this property can be used to specify which executor to use"
},
"custom_task_executor_arguments": {
"type": "object",
"title": "Custom task executor arguments",
"description": "Any arguments passed to a custom executor need to be defined here."
},
"replace_regex": {
"type": "array",
"title": "List of regex to match and replace",
Expand Down
36 changes: 36 additions & 0 deletions src/test/java/com/microsoft/lst_bench/DriverSparkTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,42 @@ public void testJDBCSessionIceberg() throws Exception {
"src/test/resources/config/spark/w_all_tpcds_single_session_jdbc-iceberg.yaml");
}

@Test
@EnabledIfSystemProperty(named = "lst-bench.test.lst", matches = "delta")
@EnabledIfSystemProperty(named = "lst-bench.test.connection", matches = "jdbc")
public void testJDBCSkipFailedQueriesDelta() throws Exception {
runDriver(
"src/test/resources/config/spark/jdbc_connection_config.yaml",
"src/test/resources/config/spark/experiment_config-delta.yaml",
"src/test/resources/config/spark/telemetry_config.yaml",
"src/test/resources/config/spark/simplified_task_library.yaml",
"src/test/resources/config/spark/w_faulty_query_test.yaml");
}

@Test
@EnabledIfSystemProperty(named = "lst-bench.test.lst", matches = "hudi")
@EnabledIfSystemProperty(named = "lst-bench.test.connection", matches = "jdbc")
public void testJDBCSkipFailedQueriesHudi() throws Exception {
runDriver(
"src/test/resources/config/spark/jdbc_connection_config.yaml",
"src/test/resources/config/spark/experiment_config-hudi.yaml",
"src/test/resources/config/spark/telemetry_config.yaml",
"src/test/resources/config/spark/simplified_task_library.yaml",
"src/test/resources/config/spark/w_faulty_query_test.yaml");
}

@Test
@EnabledIfSystemProperty(named = "lst-bench.test.lst", matches = "iceberg")
@EnabledIfSystemProperty(named = "lst-bench.test.connection", matches = "jdbc")
public void testJDBCSkipFailedQueriesIceber() throws Exception {
runDriver(
"src/test/resources/config/spark/jdbc_connection_config.yaml",
"src/test/resources/config/spark/experiment_config-iceberg.yaml",
"src/test/resources/config/spark/telemetry_config.yaml",
"src/test/resources/config/spark/simplified_task_library.yaml",
"src/test/resources/config/spark/w_faulty_query_test.yaml");
}

private void runDriver(String arg0, String arg1, String arg2, String arg3, String arg4)
throws Exception {
Driver.main(new String[] {"-c", arg0, "-e", arg1, "-t", arg2, "-l", arg3, "-w", arg4});
Expand Down
4 changes: 4 additions & 0 deletions src/test/resources/config/spark/simplified_task_library.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,7 @@ task_templates:
- id: optimize_hudi
files:
- src/main/resources/scripts/tpcds/optimize/spark/o_ship_mode-hudi.sql
# Execution of faulty TPC-DS query
- id: faulty_query
files:
- src/test/resources/scripts/faulty_test_query.sql
Loading

0 comments on commit 6e5d861

Please sign in to comment.