Skip to content

Commit

Permalink
feat: implement queries for postgres
Browse files Browse the repository at this point in the history
refactor AbstractJdbcQueries to split queries and commiting separately when non-transactional mode is effective
  • Loading branch information
mgabelle committed Oct 18, 2024
1 parent 39a2544 commit f23e2c0
Show file tree
Hide file tree
Showing 6 changed files with 475 additions and 27 deletions.
14 changes: 14 additions & 0 deletions docker-compose-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,20 @@ services:
ports:
- "64790:3306"

postgres-multi-query:
image: postgres:latest
environment:
POSTGRES_DB: kestra
POSTGRES_USER: postgres
POSTGRES_PASSWORD: pg_passwd
healthcheck:
test: ["CMD-SHELL", "pg_isready -d $${POSTGRES_DB} -U $${POSTGRES_USER}"]
interval: 30s
timeout: 10s
retries: 10
ports:
- "56983:5432"

postgres:
image: bitnami/postgresql:latest
environment:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,6 @@ DROP TABLE IF EXISTS test_transaction;
CREATE TABLE test_transaction
(
id MEDIUMINT NOT NULL AUTO_INCREMENT,
name CHAR(30) NOT NULL,
name VARCHAR(30) NOT NULL,
PRIMARY KEY (id)
);
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package io.kestra.plugin.jdbc.postgresql;

import io.kestra.core.models.annotations.Example;
import io.kestra.core.models.annotations.Plugin;
import io.kestra.core.models.tasks.RunnableTask;
import io.kestra.core.runners.RunContext;
import io.kestra.plugin.jdbc.AbstractCellConverter;
import io.kestra.plugin.jdbc.AbstractJdbcQueries;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.*;
import lombok.experimental.SuperBuilder;

import java.sql.DriverManager;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.Properties;


@SuperBuilder
@ToString
@EqualsAndHashCode
@Getter
@NoArgsConstructor
@Schema(
title = "Preform multiple queries on a PostgreSQL server."
)
@Plugin(
examples = {
@Example(
full = true,
title = "Execute a query and fetch results in a task.",
code = """
id: postgres_query
namespace: company.team
tasks:
- id: fetch
type: io.kestra.plugin.jdbc.postgresql.Queries
url: jdbc:postgresql://127.0.0.1:56982/
username: pg_user
password: pg_password
sql: |
SELECT firstName, lastName FROM employee;
SELECT brand FROM laptop;
fetchType: FETCH
"""
)
}
)
public class Queries extends AbstractJdbcQueries implements RunnableTask<AbstractJdbcQueries.MultiQueryOutput>, PostgresConnectionInterface {
@Builder.Default
protected Boolean ssl = false;
protected SslMode sslMode;
protected String sslRootCert;
protected String sslCert;
protected String sslKey;
protected String sslKeyPassword;

@Override
public Properties connectionProperties(RunContext runContext) throws Exception {
Properties properties = super.connectionProperties(runContext);
PostgresService.handleSsl(properties, runContext, this);

return properties;
}

@Override
protected AbstractCellConverter getCellConverter(ZoneId zoneId) {
return new PostgresCellConverter(zoneId);
}

@Override
public void registerDriver() throws SQLException {
DriverManager.registerDriver(new org.postgresql.Driver());
}

}
Loading

0 comments on commit f23e2c0

Please sign in to comment.