Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New plugins Arrow Flight and Dremio #169

Merged
merged 10 commits into from
Oct 26, 2023
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ subprojects {
mavenCentral()
if (isBuildSnapshot) {
maven { url "https://s01.oss.sonatype.org/content/repositories/snapshots/" }
maven { url "https://maven.dremio.com/free/" }
}
}

Expand Down
7 changes: 7 additions & 0 deletions docker-compose-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,10 @@ services:
- QuickStart
- -type
- batch

dremio:
image: dremio/dremio-oss
ports:
- "9047:9047"
- "31010:31010"
- "45678:45678"
20 changes: 20 additions & 0 deletions plugin-jdbc-arrow-flight/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
project.description = 'Query Dremio databases using the Kestra Apache Arrow Flight JDBC plugin.'
anna-geller marked this conversation as resolved.
Show resolved Hide resolved

jar {
manifest {
attributes(
"X-Kestra-Name": project.name,
"X-Kestra-Title": "Arrow Flight",
anna-geller marked this conversation as resolved.
Show resolved Hide resolved
"X-Kestra-Group": project.group + ".jdbc.arrowflight",
"X-Kestra-Description": project.description,
"X-Kestra-Version": project.version
)
}
}

dependencies {
implementation("org.apache.arrow:flight-sql-jdbc-driver:12.0.1")
implementation project(':plugin-jdbc')

testImplementation project(':plugin-jdbc').sourceSets.test.output
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.kestra.plugin.jdbc.arrowflight;

import io.kestra.plugin.jdbc.AbstractCellConverter;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.ZoneId;

public class ArrowFlightCellConverter extends AbstractCellConverter {

public ArrowFlightCellConverter(ZoneId zoneId) {
super(zoneId);
}

@Override
public Object convertCell(int columnIndex, ResultSet resultSet, Connection connection) throws SQLException {
return super.convert(columnIndex, resultSet);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.kestra.plugin.jdbc.arrowflight;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Query a databases through Apache Arrow Flight SQL driver."
anna-geller marked this conversation as resolved.
Show resolved Hide resolved
)
@Plugin(
examples = {
@Example(
title = "Send a sql query to a Dremio direct database and fetch a row as outputs using Apache Arrow Flight SQL driver",
anna-geller marked this conversation as resolved.
Show resolved Hide resolved
code = {
"url: jdbc:arrow-flight-sql://localhost:31010/?useEncryption=false",
"username: dremio",
"password: dremio123",
"sql: select * FROM \"postgres.public\".departments",
anna-geller marked this conversation as resolved.
Show resolved Hide resolved
"fetch: true",
}
),
@Example(
title = "Send a sql query to a Dremio coordinator and fetch a row as outputs using Apache Arrow Flight SQL driver",
code = {
"url: jdbc:arrow-flight-sql://dremio-coordinator:32010/?schema=postgres.public",
"username: $token",
"password: samplePersonalAccessToken",
"sql: select * FROM departments",
"fetch: true",
}
)
}
)
public class Query extends AbstractJdbcQuery implements RunnableTask<AbstractJdbcQuery.Output> {
@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new ArrowFlightCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new ArrowFlightJdbcDriver());
}

@Override
public Output run(RunContext runContext) throws Exception {
return super.run(runContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package io.kestra.plugin.jdbc.arrowflight;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AbstractJdbcTrigger;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver;

import java.sql.DriverManager;
import java.sql.SQLException;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Wait for query on a Arrow Flight database."
)
@Plugin(
examples = {
@Example(
title = "Wait for a sql query to return results and iterate through rows",
anna-geller marked this conversation as resolved.
Show resolved Hide resolved
full = true,
code = {
"id: jdbc-trigger",
"namespace: io.kestra.tests",
"",
"tasks:",
" - id: each",
" type: io.kestra.core.tasks.flows.EachSequential",
" tasks:",
" - id: return",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{json(taskrun.value)}}\"",
" value: \"{{ trigger.rows }}\"",
"",
"triggers:",
" - id: watch",
" type: io.kestra.plugin.jdbc.arrowflight.Trigger",
" url: jdbc:arrow-flight-sql://dremio-coordinator:32010/?schema=postgres.public",
" interval: \"PT5M\"",
" sql: \"SELECT * FROM my_table\""
}
)
}
)
public class Trigger extends AbstractJdbcTrigger {
@Override
protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Exception {
var query = Query.builder()
.id(this.id)
.type(Query.class.getName())
.url(this.getUrl())
.username(this.getUsername())
.password(this.getPassword())
.timeZoneId(this.getTimeZoneId())
.sql(this.getSql())
.store(this.isStore())
.fetch(this.isFetch())
.fetchOne(this.isFetchOne())
.additionalVars(this.additionalVars)
.build();
return query.run(runContext);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new ArrowFlightJdbcDriver());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
@PluginSubGroup(
description = "This sub-group of plugins contains tasks for accessing the databases through Apache Arrow Flight.",
anna-geller marked this conversation as resolved.
Show resolved Hide resolved
categories = PluginSubGroup.PluginCategory.DATABASE
)
package io.kestra.plugin.jdbc.arrowflight;

import io.kestra.core.models.annotations.PluginSubGroup;
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
26 changes: 26 additions & 0 deletions plugin-jdbc-arrow-flight/src/main/resources/icons/plugin-icon.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
20 changes: 20 additions & 0 deletions plugin-jdbc-dremio/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
project.description = 'Query Dremio databases using the Kestra JDBC plugin.'
anna-geller marked this conversation as resolved.
Show resolved Hide resolved

jar {
manifest {
attributes(
"X-Kestra-Name": project.name,
"X-Kestra-Title": "Dremio",
"X-Kestra-Group": project.group + ".jdbc.dremio",
"X-Kestra-Description": project.description,
"X-Kestra-Version": project.version
)
}
}

dependencies {
implementation("com.dremio.distribution:dremio-jdbc-driver:3.0.6-201812082352540436-1f684f9")
implementation project(':plugin-jdbc')

testImplementation project(':plugin-jdbc').sourceSets.test.output
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.kestra.plugin.jdbc.dremio;

import io.kestra.plugin.jdbc.AbstractCellConverter;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.ZoneId;

public class DremioCellConverter extends AbstractCellConverter {

public DremioCellConverter(ZoneId zoneId) {
super(zoneId);
}

@Override
public Object convertCell(int columnIndex, ResultSet resultSet, Connection connection) throws SQLException {
return super.convert(columnIndex, resultSet);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.kestra.plugin.jdbc.dremio;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Query a Dremio database."
)
@Plugin(
examples = {
@Example(
title = "Send a sql query to a Dremio database and fetch a row as outputs",
anna-geller marked this conversation as resolved.
Show resolved Hide resolved
code = {
"url: jdbc:dremio:direct=sql.dremio.cloud:443;ssl=true;PROJECT_ID=sampleProjectId;",
"username: $token",
"password: samplePersonalAccessToken",
"sql: select * FROM source.database.table",
"fetchOne: true",
}
)
}
)
public class Query extends AbstractJdbcQuery implements RunnableTask<AbstractJdbcQuery.Output> {
@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new DremioCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new com.dremio.jdbc.Driver());
}

@Override
public Output run(RunContext runContext) throws Exception {
return super.run(runContext);
}
}
Loading
Loading