Skip to content

Commit

Permalink
feat: Add a new Queries task to run multiple SQL with multiple output…
Browse files Browse the repository at this point in the history
…s, parameter binding and transactions (#398)

* feat: Add a new Queries task to run multiple SQL with multiple outputs, parameter binding and transactions

extract common logic to utils and abstract classes

created test for queries

implemented queries for MySQL

* feat(jdbc): add parameters to Queries task

add parameters to base queries task

add unit tests for mysql with parameters

* feat(jdbc): add rollback when query fails

implement savepoint and rollback when one query fails

add unit test to verify transactional query

* feat(jdbc): remove rollback when query is not transactional

add unit test to verify non transactional query

* refactor: create custom method for named parameters

* fix: implement runnable task for mysql

* doc: add sample flow for mysql
  • Loading branch information
mgabelle authored Oct 21, 2024
1 parent 6996ce8 commit f08f207
Show file tree
Hide file tree
Showing 17 changed files with 858 additions and 246 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,10 @@

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.Output;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.Task;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcBatch;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.nio.file.Path;
import java.sql.DriverManager;
import java.sql.SQLException;

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package io.kestra.plugin.jdbc.mysql;

import io.micronaut.http.uri.UriBuilder;

import java.net.URI;
import java.nio.file.Path;
import java.util.Properties;

public class MysqlUtils {

private MysqlUtils() {
throw new IllegalStateException("Utility class");
}

protected static Properties createMysqlProperties(Properties props, Path workingDirectory,
boolean isMultiQuery) {
URI url = URI.create((String) props.get("jdbc.url"));
url = URI.create(url.getSchemeSpecificPart());

UriBuilder builder = UriBuilder.of(url);

// allow local in file for current worker and prevent the global one
builder.queryParam("allowLoadLocalInfileInPath", workingDirectory.toAbsolutePath().toString());
builder.replaceQueryParam("allowLoadLocalInfile", false);

// see https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-implementation-notes.html
// By default, ResultSets are completely retrieved and stored in memory.
builder.replaceQueryParam("useCursorFetch", true);

builder.scheme("jdbc:mysql");

if(isMultiQuery) {
builder.queryParam("allowMultiQueries", true);
}

props.put("jdbc.url", builder.build().toString());

return props;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package io.kestra.plugin.jdbc.mysql;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.models.tasks.runners.PluginUtilsService;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcBaseQuery;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.nio.file.Path;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.Map;
import java.util.Properties;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Query a MySQL database."
)
@Plugin(
examples = {
@Example(
title = "Send a SQL query to a MySQL Database and fetch a row as output.",
full = true,
code = """
id: send_multiple_queries
namespace: test.queries
tasks:
- id: test_queries_insert
type: io.kestra.plugin.jdbc.mysql.Queries
fetchType: FETCH
url: jdbc:mysql://mysql:3306/kestra
username: "${{secret('MYSQL_USERNAME')}}"
password: "${{secret('MYSQL_PASSWORD')}}"
sql: "{{ read('populate.sql') }}"
- id: test_queries_select
type: io.kestra.plugin.jdbc.mysql.Queries
fetchType: FETCH
url: jdbc:mysql://mysql:3306/kestra
username: root
password: mysql_passwd
sql: |
SELECT firstName, lastName FROM employee;
SELECT brand FROM laptop;
"""
)
}
)
public class Queries extends AbstractJdbcQueries implements RunnableTask<AbstractJdbcQueries.MultiQueryOutput> {

@Schema(
title = "Add input file to be loaded with `LOAD DATA LOCAL`.",
description = "The file must be from Kestra's internal storage"
)
@PluginProperty(dynamic = true)
protected String inputFile;

@Getter(AccessLevel.NONE)
protected transient Path workingDirectory;

@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new MysqlCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new com.mysql.cj.jdbc.Driver());
}

@Override
public Properties connectionProperties(RunContext runContext) throws Exception {
return MysqlUtils.createMysqlProperties(super.connectionProperties(runContext), this.workingDirectory, true);
}

@Override
public Integer getFetchSize() {
// The combination of useCursorFetch=true and preparedStatement.setFetchSize(10); push to use cursor on MySql DB instance side.
// This leads to consuming DB instance disk memory when we try to fetch more than aware table size.
// It actually just disables client-side caching of the entire response and gives you responses as they arrive as a result it has no effect on the DB
return this.isStore() ? Integer.MIN_VALUE : this.fetchSize;
}

@Override
public AbstractJdbcQueries.MultiQueryOutput run(RunContext runContext) throws Exception {
this.workingDirectory = runContext.workingDir().path();

if (this.inputFile != null) {
PluginUtilsService.createInputFiles(
runContext,
workingDirectory,
Map.of("inputFile", this.inputFile),
additionalVars
);
}

additionalVars.put("inputFile", workingDirectory.toAbsolutePath().resolve("inputFile").toString());

return super.run(runContext);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AutoCommitInterface;
import io.micronaut.http.uri.UriBuilder;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.AccessLevel;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.net.URI;
import java.nio.file.Path;
import java.sql.DriverManager;
import java.sql.SQLException;
Expand Down Expand Up @@ -103,28 +105,10 @@ public void registerDriver() throws SQLException {

@Override
public Properties connectionProperties(RunContext runContext) throws Exception {
Properties props = super.connectionProperties(runContext);

URI url = URI.create((String) props.get("jdbc.url"));
url = URI.create(url.getSchemeSpecificPart());

UriBuilder builder = UriBuilder.of(url);

// allow local in file for current worker and prevent the global one
builder.queryParam("allowLoadLocalInfileInPath", this.workingDirectory.toAbsolutePath().toString());
builder.replaceQueryParam("allowLoadLocalInfile", false);

// see https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-implementation-notes.html
// By default, ResultSets are completely retrieved and stored in memory.
builder.replaceQueryParam("useCursorFetch", true);

builder.scheme("jdbc:mysql");

props.put("jdbc.url", builder.build().toString());

return props;
return MysqlUtils.createMysqlProperties(super.connectionProperties(runContext), this.workingDirectory, false);
}

@Override
public Integer getFetchSize() {
// The combination of useCursorFetch=true and preparedStatement.setFetchSize(10); push to use cursor on MySql DB instance side.
// This leads to consuming DB instance disk memory when we try to fetch more than aware table size.
Expand Down
Loading

0 comments on commit f08f207

Please sign in to comment.