diff --git a/build.gradle b/build.gradle index c758b771..0dbec984 100644 --- a/build.gradle +++ b/build.gradle @@ -29,6 +29,7 @@ subprojects { mavenCentral() if (isBuildSnapshot) { maven { url "https://s01.oss.sonatype.org/content/repositories/snapshots/" } + maven { url "https://maven.dremio.com/free/" } } } diff --git a/docker-compose-ci.yml b/docker-compose-ci.yml index 66d2ad9c..a16bf2a5 100644 --- a/docker-compose-ci.yml +++ b/docker-compose-ci.yml @@ -67,3 +67,10 @@ services: - QuickStart - -type - batch + + dremio: + image: dremio/dremio-oss + ports: + - "9047:9047" + - "31010:31010" + - "45678:45678" \ No newline at end of file diff --git a/plugin-jdbc-arrow-flight/build.gradle b/plugin-jdbc-arrow-flight/build.gradle new file mode 100644 index 00000000..89a0ea5f --- /dev/null +++ b/plugin-jdbc-arrow-flight/build.gradle @@ -0,0 +1,20 @@ +project.description = 'Query a compatible database using the Kestra Apache Arrow Flight SQL JDBC plugin.' + +jar { + manifest { + attributes( + "X-Kestra-Name": project.name, + "X-Kestra-Title": "Arrow Flight SQL", + "X-Kestra-Group": project.group + ".jdbc.arrowflight", + "X-Kestra-Description": project.description, + "X-Kestra-Version": project.version + ) + } +} + +dependencies { + implementation("org.apache.arrow:flight-sql-jdbc-driver:12.0.1") + implementation project(':plugin-jdbc') + + testImplementation project(':plugin-jdbc').sourceSets.test.output +} \ No newline at end of file diff --git a/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/ArrowFlightCellConverter.java b/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/ArrowFlightCellConverter.java new file mode 100644 index 00000000..6861c6c0 --- /dev/null +++ b/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/ArrowFlightCellConverter.java @@ -0,0 +1,20 @@ +package io.kestra.plugin.jdbc.arrowflight; + +import io.kestra.plugin.jdbc.AbstractCellConverter; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.ZoneId; + +public class ArrowFlightCellConverter extends AbstractCellConverter { + + public ArrowFlightCellConverter(ZoneId zoneId) { + super(zoneId); + } + + @Override + public Object convertCell(int columnIndex, ResultSet resultSet, Connection connection) throws SQLException { + return super.convert(columnIndex, resultSet); + } +} diff --git a/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/Query.java b/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/Query.java new file mode 100644 index 00000000..a313f2b9 --- /dev/null +++ b/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/Query.java @@ -0,0 +1,68 @@ +package io.kestra.plugin.jdbc.arrowflight; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.jdbc.AbstractCellConverter; +import io.kestra.plugin.jdbc.AbstractJdbcQuery; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver; + +import java.sql.DriverManager; +import java.sql.SQLException; +import java.time.ZoneId; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Query a database through Apache Arrow Flight SQL driver." +) +@Plugin( + examples = { + @Example( + title = "Send a SQL query to a database and fetch a row as outputs using Apache Arrow Flight SQL driver", + code = { + "url: jdbc:arrow-flight-sql://localhost:31010/?useEncryption=false", + "username: dremio", + "password: dremio123", + "sql: select * FROM departments", + "fetch: true", + } + ), + @Example( + title = "Send a sql query to a Dremio coordinator and fetch a row as outputs using Apache Arrow Flight SQL driver", + code = { + "url: jdbc:arrow-flight-sql://dremio-coordinator:32010/?schema=postgres.public", + "username: $token", + "password: samplePersonalAccessToken", + "sql: select * FROM departments", + "fetch: true", + } + ) + } +) +public class Query extends AbstractJdbcQuery implements RunnableTask { + @Override + protected AbstractCellConverter getCellConverter(ZoneId zoneId) { + return new ArrowFlightCellConverter(zoneId); + } + + @Override + public void registerDriver() throws SQLException { + DriverManager.registerDriver(new ArrowFlightJdbcDriver()); + } + + @Override + public Output run(RunContext runContext) throws Exception { + return super.run(runContext); + } +} diff --git a/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/Trigger.java b/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/Trigger.java new file mode 100644 index 00000000..fe54c148 --- /dev/null +++ b/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/Trigger.java @@ -0,0 +1,78 @@ +package io.kestra.plugin.jdbc.arrowflight; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.jdbc.AbstractJdbcQuery; +import io.kestra.plugin.jdbc.AbstractJdbcTrigger; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; +import org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver; + +import java.sql.DriverManager; +import java.sql.SQLException; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Wait for query on a Arrow Flight database." +) +@Plugin( + examples = { + @Example( + title = "Wait for a SQL query to return results and iterate through rows", + full = true, + code = { + "id: jdbc-trigger", + "namespace: io.kestra.tests", + "", + "tasks:", + " - id: each", + " type: io.kestra.core.tasks.flows.EachSequential", + " tasks:", + " - id: return", + " type: io.kestra.core.tasks.debugs.Return", + " format: \"{{json(taskrun.value)}}\"", + " value: \"{{ trigger.rows }}\"", + "", + "triggers:", + " - id: watch", + " type: io.kestra.plugin.jdbc.arrowflight.Trigger", + " url: jdbc:arrow-flight-sql://dremio-coordinator:32010/?schema=postgres.public", + " interval: \"PT5M\"", + " sql: \"SELECT * FROM my_table\"" + } + ) + } +) +public class Trigger extends AbstractJdbcTrigger { + @Override + protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Exception { + var query = Query.builder() + .id(this.id) + .type(Query.class.getName()) + .url(this.getUrl()) + .username(this.getUsername()) + .password(this.getPassword()) + .timeZoneId(this.getTimeZoneId()) + .sql(this.getSql()) + .store(this.isStore()) + .fetch(this.isFetch()) + .fetchOne(this.isFetchOne()) + .additionalVars(this.additionalVars) + .build(); + return query.run(runContext); + } + + @Override + public void registerDriver() throws SQLException { + DriverManager.registerDriver(new ArrowFlightJdbcDriver()); + } +} diff --git a/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/package-info.java b/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/package-info.java new file mode 100644 index 00000000..5f7b0b04 --- /dev/null +++ b/plugin-jdbc-arrow-flight/src/main/java/io/kestra/plugin/jdbc/arrowflight/package-info.java @@ -0,0 +1,7 @@ +@PluginSubGroup( + description = "This sub-group of plugins contains tasks for accessing a databases through Apache Arrow Flight SQL.", + categories = PluginSubGroup.PluginCategory.DATABASE +) +package io.kestra.plugin.jdbc.arrowflight; + +import io.kestra.core.models.annotations.PluginSubGroup; \ No newline at end of file diff --git a/plugin-jdbc-arrow-flight/src/main/resources/icons/io.kestra.plugin.jdbc.arrow-flight.svg b/plugin-jdbc-arrow-flight/src/main/resources/icons/io.kestra.plugin.jdbc.arrow-flight.svg new file mode 100644 index 00000000..1cd036e8 --- /dev/null +++ b/plugin-jdbc-arrow-flight/src/main/resources/icons/io.kestra.plugin.jdbc.arrow-flight.svg @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/plugin-jdbc-arrow-flight/src/main/resources/icons/plugin-icon.svg b/plugin-jdbc-arrow-flight/src/main/resources/icons/plugin-icon.svg new file mode 100644 index 00000000..1cd036e8 --- /dev/null +++ b/plugin-jdbc-arrow-flight/src/main/resources/icons/plugin-icon.svg @@ -0,0 +1,26 @@ + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/plugin-jdbc-dremio/build.gradle b/plugin-jdbc-dremio/build.gradle new file mode 100644 index 00000000..ab31a65f --- /dev/null +++ b/plugin-jdbc-dremio/build.gradle @@ -0,0 +1,20 @@ +project.description = 'Query Dremio database using the Kestra JDBC plugin.' + +jar { + manifest { + attributes( + "X-Kestra-Name": project.name, + "X-Kestra-Title": "Dremio", + "X-Kestra-Group": project.group + ".jdbc.dremio", + "X-Kestra-Description": project.description, + "X-Kestra-Version": project.version + ) + } +} + +dependencies { + implementation("com.dremio.distribution:dremio-jdbc-driver:3.0.6-201812082352540436-1f684f9") + implementation project(':plugin-jdbc') + + testImplementation project(':plugin-jdbc').sourceSets.test.output +} \ No newline at end of file diff --git a/plugin-jdbc-dremio/src/main/java/io/kestra/plugin/jdbc/dremio/DremioCellConverter.java b/plugin-jdbc-dremio/src/main/java/io/kestra/plugin/jdbc/dremio/DremioCellConverter.java new file mode 100644 index 00000000..23f78c58 --- /dev/null +++ b/plugin-jdbc-dremio/src/main/java/io/kestra/plugin/jdbc/dremio/DremioCellConverter.java @@ -0,0 +1,20 @@ +package io.kestra.plugin.jdbc.dremio; + +import io.kestra.plugin.jdbc.AbstractCellConverter; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.ZoneId; + +public class DremioCellConverter extends AbstractCellConverter { + + public DremioCellConverter(ZoneId zoneId) { + super(zoneId); + } + + @Override + public Object convertCell(int columnIndex, ResultSet resultSet, Connection connection) throws SQLException { + return super.convert(columnIndex, resultSet); + } +} diff --git a/plugin-jdbc-dremio/src/main/java/io/kestra/plugin/jdbc/dremio/Query.java b/plugin-jdbc-dremio/src/main/java/io/kestra/plugin/jdbc/dremio/Query.java new file mode 100644 index 00000000..8ef44c76 --- /dev/null +++ b/plugin-jdbc-dremio/src/main/java/io/kestra/plugin/jdbc/dremio/Query.java @@ -0,0 +1,57 @@ +package io.kestra.plugin.jdbc.dremio; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.models.tasks.RunnableTask; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.jdbc.AbstractCellConverter; +import io.kestra.plugin.jdbc.AbstractJdbcQuery; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +import java.sql.DriverManager; +import java.sql.SQLException; +import java.time.ZoneId; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Query a Dremio database." +) +@Plugin( + examples = { + @Example( + title = "Send a SQL query to a Dremio database and fetch a row as outputs", + code = { + "url: jdbc:dremio:direct=sql.dremio.cloud:443;ssl=true;PROJECT_ID=sampleProjectId;", + "username: $token", + "password: samplePersonalAccessToken", + "sql: select * FROM source.database.table", + "fetchOne: true", + } + ) + } +) +public class Query extends AbstractJdbcQuery implements RunnableTask { + @Override + protected AbstractCellConverter getCellConverter(ZoneId zoneId) { + return new DremioCellConverter(zoneId); + } + + @Override + public void registerDriver() throws SQLException { + DriverManager.registerDriver(new com.dremio.jdbc.Driver()); + } + + @Override + public Output run(RunContext runContext) throws Exception { + return super.run(runContext); + } +} diff --git a/plugin-jdbc-dremio/src/main/java/io/kestra/plugin/jdbc/dremio/Trigger.java b/plugin-jdbc-dremio/src/main/java/io/kestra/plugin/jdbc/dremio/Trigger.java new file mode 100644 index 00000000..659d6f35 --- /dev/null +++ b/plugin-jdbc-dremio/src/main/java/io/kestra/plugin/jdbc/dremio/Trigger.java @@ -0,0 +1,76 @@ +package io.kestra.plugin.jdbc.dremio; + +import io.kestra.core.models.annotations.Example; +import io.kestra.core.models.annotations.Plugin; +import io.kestra.core.runners.RunContext; +import io.kestra.plugin.jdbc.AbstractJdbcQuery; +import io.kestra.plugin.jdbc.AbstractJdbcTrigger; +import io.swagger.v3.oas.annotations.media.Schema; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.ToString; +import lombok.experimental.SuperBuilder; + +import java.sql.DriverManager; +import java.sql.SQLException; + +@SuperBuilder +@ToString +@EqualsAndHashCode +@Getter +@NoArgsConstructor +@Schema( + title = "Wait for query on a Dremio database." +) +@Plugin( + examples = { + @Example( + title = "Wait for a SQL query to return results and iterate through rows", + full = true, + code = { + "id: jdbc-trigger", + "namespace: io.kestra.tests", + "", + "tasks:", + " - id: each", + " type: io.kestra.core.tasks.flows.EachSequential", + " tasks:", + " - id: return", + " type: io.kestra.core.tasks.debugs.Return", + " format: \"{{json(taskrun.value)}}\"", + " value: \"{{ trigger.rows }}\"", + "", + "triggers:", + " - id: watch", + " type: io.kestra.plugin.jdbc.dremio.Trigger", + " interval: \"PT5M\"", + " sql: \"SELECT * FROM source.database.my_table\"" + } + ) + } +) +public class Trigger extends AbstractJdbcTrigger { + @Override + protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Exception { + var query = Query.builder() + .id(this.id) + .type(Query.class.getName()) + .url(this.getUrl()) + .username(this.getUsername()) + .password(this.getPassword()) + .timeZoneId(this.getTimeZoneId()) + .sql(this.getSql()) + .store(this.isStore()) + .fetch(this.isFetch()) + .fetchOne(this.isFetchOne()) + .additionalVars(this.additionalVars) + .build(); + return query.run(runContext); + } + + @Override + public void registerDriver() throws SQLException { + DriverManager.registerDriver(new com.dremio.jdbc.Driver()); + } +} diff --git a/plugin-jdbc-dremio/src/main/java/io/kestra/plugin/jdbc/dremio/package-info.java b/plugin-jdbc-dremio/src/main/java/io/kestra/plugin/jdbc/dremio/package-info.java new file mode 100644 index 00000000..fa043e25 --- /dev/null +++ b/plugin-jdbc-dremio/src/main/java/io/kestra/plugin/jdbc/dremio/package-info.java @@ -0,0 +1,7 @@ +@PluginSubGroup( + description = "This sub-group of plugins contains tasks for accessing the Dremio database.", + categories = PluginSubGroup.PluginCategory.DATABASE +) +package io.kestra.plugin.jdbc.dremio; + +import io.kestra.core.models.annotations.PluginSubGroup; \ No newline at end of file diff --git a/plugin-jdbc-dremio/src/main/resources/icons/io.kestra.plugin.jdbc.dremio.svg b/plugin-jdbc-dremio/src/main/resources/icons/io.kestra.plugin.jdbc.dremio.svg new file mode 100644 index 00000000..9d6ad9ea --- /dev/null +++ b/plugin-jdbc-dremio/src/main/resources/icons/io.kestra.plugin.jdbc.dremio.svg @@ -0,0 +1,54 @@ + + + + + + + + + + + + + + + + + + + + + + + diff --git a/plugin-jdbc-dremio/src/main/resources/icons/plugin-icon.svg b/plugin-jdbc-dremio/src/main/resources/icons/plugin-icon.svg new file mode 100644 index 00000000..9d6ad9ea --- /dev/null +++ b/plugin-jdbc-dremio/src/main/resources/icons/plugin-icon.svg @@ -0,0 +1,54 @@ + + + + + + + + + + + + + + + + + + + + + + + diff --git a/settings.gradle b/settings.gradle index 49eafbf9..c1ea65f0 100644 --- a/settings.gradle +++ b/settings.gradle @@ -14,3 +14,6 @@ include 'plugin-jdbc-sqlserver' include 'plugin-jdbc-trino' include 'plugin-jdbc-vectorwise' include 'plugin-jdbc-vertica' +include 'plugin-jdbc-dremio' +include 'plugin-jdbc-arrow-flight' +