Skip to content

Commit

Permalink
Merge pull request #169 from iNikitaGricenko/arrow-flight-spit
Browse files Browse the repository at this point in the history
New plugins Arrow Flight and Dremio
  • Loading branch information
anna-geller authored Oct 26, 2023
2 parents 582e472 + f0766f1 commit 98cd68b
Show file tree
Hide file tree
Showing 17 changed files with 544 additions and 0 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ subprojects {
mavenCentral()
if (isBuildSnapshot) {
maven { url "https://s01.oss.sonatype.org/content/repositories/snapshots/" }
maven { url "https://maven.dremio.com/free/" }
}
}

Expand Down
7 changes: 7 additions & 0 deletions docker-compose-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,3 +67,10 @@ services:
- QuickStart
- -type
- batch

dremio:
image: dremio/dremio-oss
ports:
- "9047:9047"
- "31010:31010"
- "45678:45678"
20 changes: 20 additions & 0 deletions plugin-jdbc-arrow-flight/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
project.description = 'Query a compatible database using the Kestra Apache Arrow Flight SQL JDBC plugin.'

jar {
manifest {
attributes(
"X-Kestra-Name": project.name,
"X-Kestra-Title": "Arrow Flight SQL",
"X-Kestra-Group": project.group + ".jdbc.arrowflight",
"X-Kestra-Description": project.description,
"X-Kestra-Version": project.version
)
}
}

dependencies {
implementation("org.apache.arrow:flight-sql-jdbc-driver:12.0.1")
implementation project(':plugin-jdbc')

testImplementation project(':plugin-jdbc').sourceSets.test.output
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.kestra.plugin.jdbc.arrowflight;

import io.kestra.plugin.jdbc.AbstractCellConverter;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.ZoneId;

public class ArrowFlightCellConverter extends AbstractCellConverter {

public ArrowFlightCellConverter(ZoneId zoneId) {
super(zoneId);
}

@Override
public Object convertCell(int columnIndex, ResultSet resultSet, Connection connection) throws SQLException {
return super.convert(columnIndex, resultSet);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package io.kestra.plugin.jdbc.arrowflight;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Query a database through Apache Arrow Flight SQL driver."
)
@Plugin(
examples = {
@Example(
title = "Send a SQL query to a database and fetch a row as outputs using Apache Arrow Flight SQL driver",
code = {
"url: jdbc:arrow-flight-sql://localhost:31010/?useEncryption=false",
"username: dremio",
"password: dremio123",
"sql: select * FROM departments",
"fetch: true",
}
),
@Example(
title = "Send a sql query to a Dremio coordinator and fetch a row as outputs using Apache Arrow Flight SQL driver",
code = {
"url: jdbc:arrow-flight-sql://dremio-coordinator:32010/?schema=postgres.public",
"username: $token",
"password: samplePersonalAccessToken",
"sql: select * FROM departments",
"fetch: true",
}
)
}
)
public class Query extends AbstractJdbcQuery implements RunnableTask<AbstractJdbcQuery.Output> {
@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new ArrowFlightCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new ArrowFlightJdbcDriver());
}

@Override
public Output run(RunContext runContext) throws Exception {
return super.run(runContext);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package io.kestra.plugin.jdbc.arrowflight;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.kestra.plugin.jdbc.AbstractJdbcTrigger;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;
import org.apache.arrow.driver.jdbc.ArrowFlightJdbcDriver;

import java.sql.DriverManager;
import java.sql.SQLException;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Wait for query on a Arrow Flight database."
)
@Plugin(
examples = {
@Example(
title = "Wait for a SQL query to return results and iterate through rows",
full = true,
code = {
"id: jdbc-trigger",
"namespace: io.kestra.tests",
"",
"tasks:",
" - id: each",
" type: io.kestra.core.tasks.flows.EachSequential",
" tasks:",
" - id: return",
" type: io.kestra.core.tasks.debugs.Return",
" format: \"{{json(taskrun.value)}}\"",
" value: \"{{ trigger.rows }}\"",
"",
"triggers:",
" - id: watch",
" type: io.kestra.plugin.jdbc.arrowflight.Trigger",
" url: jdbc:arrow-flight-sql://dremio-coordinator:32010/?schema=postgres.public",
" interval: \"PT5M\"",
" sql: \"SELECT * FROM my_table\""
}
)
}
)
public class Trigger extends AbstractJdbcTrigger {
@Override
protected AbstractJdbcQuery.Output runQuery(RunContext runContext) throws Exception {
var query = Query.builder()
.id(this.id)
.type(Query.class.getName())
.url(this.getUrl())
.username(this.getUsername())
.password(this.getPassword())
.timeZoneId(this.getTimeZoneId())
.sql(this.getSql())
.store(this.isStore())
.fetch(this.isFetch())
.fetchOne(this.isFetchOne())
.additionalVars(this.additionalVars)
.build();
return query.run(runContext);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new ArrowFlightJdbcDriver());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
@PluginSubGroup(
description = "This sub-group of plugins contains tasks for accessing a databases through Apache Arrow Flight SQL.",
categories = PluginSubGroup.PluginCategory.DATABASE
)
package io.kestra.plugin.jdbc.arrowflight;

import io.kestra.core.models.annotations.PluginSubGroup;
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
26 changes: 26 additions & 0 deletions plugin-jdbc-arrow-flight/src/main/resources/icons/plugin-icon.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
20 changes: 20 additions & 0 deletions plugin-jdbc-dremio/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
project.description = 'Query Dremio database using the Kestra JDBC plugin.'

jar {
manifest {
attributes(
"X-Kestra-Name": project.name,
"X-Kestra-Title": "Dremio",
"X-Kestra-Group": project.group + ".jdbc.dremio",
"X-Kestra-Description": project.description,
"X-Kestra-Version": project.version
)
}
}

dependencies {
implementation("com.dremio.distribution:dremio-jdbc-driver:3.0.6-201812082352540436-1f684f9")
implementation project(':plugin-jdbc')

testImplementation project(':plugin-jdbc').sourceSets.test.output
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.kestra.plugin.jdbc.dremio;

import io.kestra.plugin.jdbc.AbstractCellConverter;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.ZoneId;

public class DremioCellConverter extends AbstractCellConverter {

public DremioCellConverter(ZoneId zoneId) {
super(zoneId);
}

@Override
public Object convertCell(int columnIndex, ResultSet resultSet, Connection connection) throws SQLException {
return super.convert(columnIndex, resultSet);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
package io.kestra.plugin.jdbc.dremio;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQuery;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;
import lombok.experimental.SuperBuilder;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;

@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Query a Dremio database."
)
@Plugin(
examples = {
@Example(
title = "Send a SQL query to a Dremio database and fetch a row as outputs",
code = {
"url: jdbc:dremio:direct=sql.dremio.cloud:443;ssl=true;PROJECT_ID=sampleProjectId;",
"username: $token",
"password: samplePersonalAccessToken",
"sql: select * FROM source.database.table",
"fetchOne: true",
}
)
}
)
public class Query extends AbstractJdbcQuery implements RunnableTask<AbstractJdbcQuery.Output> {
@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new DremioCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new com.dremio.jdbc.Driver());
}

@Override
public Output run(RunContext runContext) throws Exception {
return super.run(runContext);
}
}
Loading

0 comments on commit 98cd68b

Please sign in to comment.