Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New Destination: Teradata Vantage #19420

Closed
wants to merge 55 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
61c951e
Implemented airbyte destination connector for teradata using java
sc250072 Nov 9, 2022
6dbf3ce
added entry for teradata
sc250072 Nov 9, 2022
a4022ab
Added key words for destination teradata connector
sc250072 Nov 9, 2022
ec780b4
Modified teradata keyword list as per teradata documentation
sc250072 Nov 9, 2022
dcb106f
Merged master branch
sc250072 Nov 10, 2022
b78f300
added teradata specific properties
sc250072 Nov 10, 2022
62b0b6d
teradata jdbc driver is removed from file system as linked ot maven d…
sc250072 Nov 10, 2022
7c115be
Removed unncessary comments
sc250072 Nov 10, 2022
4ecc7e2
more info logs added
sc250072 Nov 11, 2022
96b0fe3
tests issues resolved
sc250072 Nov 11, 2022
fd49f1e
tests issues resolved
sc250072 Nov 11, 2022
f8e1319
tests issues resolved
sc250072 Nov 11, 2022
943ce7c
tests issues resolved
sc250072 Nov 11, 2022
f44b1c5
tests issues resolved
sc250072 Nov 11, 2022
915ff29
tests issues resolved
sc250072 Nov 11, 2022
525de01
tests issues resolved
sc250072 Nov 11, 2022
fffa0d7
tests issues resolved
sc250072 Nov 11, 2022
a33d296
testcase modified
sc250072 Nov 14, 2022
2c1d181
Acceptance tests added
sc250072 Nov 14, 2022
bdcb0d9
Acceptance tests added
sc250072 Nov 14, 2022
4642c50
Testcases failed. woring on fix
sc250072 Nov 14, 2022
a4caaaf
Testcases failed. woring on fix
sc250072 Nov 14, 2022
4d5eca6
Testcases failed. woring on fix
sc250072 Nov 14, 2022
a72bf56
Testcases failed. woring on fix
sc250072 Nov 14, 2022
bc72207
records failed
sc250072 Nov 14, 2022
b029ad7
records failed
sc250072 Nov 14, 2022
68202ef
records failed
sc250072 Nov 14, 2022
a4313b7
records failed
sc250072 Nov 14, 2022
8ab2698
records failed
sc250072 Nov 14, 2022
58762b3
records failed
sc250072 Nov 14, 2022
1a5a655
records failed
sc250072 Nov 14, 2022
6b2a8e9
records failed
sc250072 Nov 14, 2022
836e65b
records failed
sc250072 Nov 14, 2022
3ca9b73
records failed
sc250072 Nov 14, 2022
64b27b4
records failed
sc250072 Nov 14, 2022
d3696b7
records failed
sc250072 Nov 14, 2022
fa4f15e
records failed
sc250072 Nov 14, 2022
161cc3e
records failed
sc250072 Nov 14, 2022
04ecfcc
more logs added to resolve compilation issues resolved
sc250072 Nov 15, 2022
92b219f
more logs added to resolve compilation issues resolved
sc250072 Nov 15, 2022
9a6be28
more logs added to resolve compilation issues resolved
sc250072 Nov 15, 2022
0ce428f
more logs added to resolve compilation issues resolved
sc250072 Nov 15, 2022
4730219
more logs added to resolve compilation issues resolved
sc250072 Nov 15, 2022
c0f2aea
Removed logger info statements
sc250072 Nov 15, 2022
a0b80fc
Removed logger info messages, added for debug
sc250072 Nov 15, 2022
eeee6e2
Format applied
SatishChinthanippuRt Nov 15, 2022
f4ef911
Merge branch 'master' into teradata_java_dest_conn
satish-chinthanippu Nov 15, 2022
55bd2e8
teradata docs updated
satish-chinthanippu Nov 17, 2022
5abec4d
Merge branch 'master' into teradata_java_dest_conn
satish-chinthanippu Nov 17, 2022
7f7e538
Logs added to resolve 67772 bytes, which is greater than the maximum …
satish-chinthanippu Nov 22, 2022
7ffa30e
Merge branch 'teradata_java_dest_conn' of https://github.com/SatishCh…
satish-chinthanippu Nov 22, 2022
0fa98b1
Merge branch 'master' into teradata_java_dest_conn
satish-chinthanippu Dec 6, 2022
27ad1b4
Removed normalization files as Marx suggested to maintain a seperate …
satish-chinthanippu Dec 13, 2022
16cfdb6
reverted normalization changes of teradata
satish-chinthanippu Dec 13, 2022
cfc2ea3
Merge branch 'master' into teradata_java_dest_conn
satish-chinthanippu Dec 13, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -238,4 +238,4 @@ DestinationType getDestinationType() {
return destinationType;
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -68,4 +68,4 @@ public static ImmutablePair<String, DestinationType> getNormalizationInfoForConn
}
}

}
}
3 changes: 3 additions & 0 deletions airbyte-db/db-lib/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ dependencies {

// MongoDB
implementation 'org.mongodb:mongodb-driver-sync:4.3.0'

// Teradata
implementation 'com.teradata.jdbc:terajdbc4:17.20.00.12'

// MySQL
implementation 'mysql:mysql-connector-java:8.0.30'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public enum DatabaseDriver {
POSTGRESQL("org.postgresql.Driver", "jdbc:postgresql://%s:%d/%s"),
REDSHIFT("com.amazon.redshift.jdbc.Driver", "jdbc:redshift://%s:%d/%s"),
SNOWFLAKE("net.snowflake.client.jdbc.SnowflakeDriver", "jdbc:snowflake://%s/"),
YUGABYTEDB("com.yugabyte.Driver", "jdbc:yugabytedb://%s:%d/%s");
YUGABYTEDB("com.yugabyte.Driver", "jdbc:yugabytedb://%s:%d/%s"),
TERADATA("com.teradata.jdbc.TeraDriver", "jdbc:teradata://%s/");

private final String driverClassName;
private final String urlFormatString;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
*
!Dockerfile
!build
18 changes: 18 additions & 0 deletions airbyte-integrations/connectors/destination-teradata/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
FROM airbyte/integration-base-java:dev AS build

WORKDIR /airbyte
ENV APPLICATION destination-teradata

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar

FROM airbyte/integration-base-java:dev

WORKDIR /airbyte
ENV APPLICATION destination-teradata

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/destination-teradata
68 changes: 68 additions & 0 deletions airbyte-integrations/connectors/destination-teradata/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Destination Teradata

This is the repository for the Teradata destination connector in Java.
For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/teradata).

## Local development

#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:destination-teradata:build
```

#### Create credentials
**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`.
Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information.

**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials.

### Locally running the connector docker image

#### Build
Build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:destination-teradata:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.

#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-teradata:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-teradata:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-teradata:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-teradata:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```

## Testing
We use `JUnit` for Java tests.

### Unit and Integration Tests
Place unit tests under `src/test/io/airbyte/integrations/destinations/teradata`.

#### Acceptance Tests
Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in
`src/test-integration/java/io/airbyte/integrations/destinations/teradataDestinationAcceptanceTest.java`.

### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:destination-teradata:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:destination-teradata:integrationTest
```

## Dependency Management

### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
plugins {
id 'application'
id 'airbyte-integration-test-java'
}

application {
mainClass = 'io.airbyte.integrations.destination.teradata.TeradataDestination'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}

dependencies {
implementation project(':airbyte-config:config-models')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-integrations:bases:base-java')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-integrations:connectors:destination-jdbc')
implementation 'com.teradata.jdbc:terajdbc4:17.20.00.12'

integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-destination-test')
integrationTestJavaImplementation project(':airbyte-integrations:connectors:destination-teradata')
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
{
"streams": [
{
"destination_sync_mode": "append",
"sync_mode": "full_refresh",
"stream": {
"name": "airbyte_acceptance_table",
"supported_sync_modes": ["full_refresh"],
"source_defined_cursor": false,
"json_schema": {
"type": "object",
"required": ["name"],
"properties": {
"name": {
"type": "string"
},
"age": {
"type": "number"
}
}
}
}
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.teradata;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
import io.airbyte.integrations.destination.jdbc.AbstractJdbcDestination;
import java.util.Collections;
import java.util.Map;
import java.util.Optional;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TeradataDestination extends AbstractJdbcDestination implements Destination {

private static final Logger LOGGER = LoggerFactory.getLogger(TeradataDestination.class);
/**
* Teradata JDBC driver
*/
public static final String DRIVER_CLASS = "com.teradata.jdbc.TeraDriver";
/**
* Default schema name
*/
public static final String DEFAULT_SCHEMA_NAME = "public";

public static void main(String[] args) throws Exception {
new IntegrationRunner(new TeradataDestination()).run(args);
}

public TeradataDestination() {
super(DRIVER_CLASS, new ExtendedNameTransformer(), new TeradataSqlOperations());
}

@Override
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
return Collections.emptyMap();
}

@Override
public JsonNode toJdbcConfig(final JsonNode config) {
final String schema = Optional.ofNullable(config.get(JdbcUtils.SCHEMA_KEY)).map(JsonNode::asText).orElse(DEFAULT_SCHEMA_NAME);

final String jdbcUrl = String.format("jdbc:teradata://%s/",
config.get(JdbcUtils.HOST_KEY).asText());

final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put(JdbcUtils.USERNAME_KEY, config.get(JdbcUtils.USERNAME_KEY).asText())
.put(JdbcUtils.JDBC_URL_KEY, jdbcUrl)
.put(JdbcUtils.SCHEMA_KEY, schema);

if (config.has(JdbcUtils.PASSWORD_KEY)) {
configBuilder.put(JdbcUtils.PASSWORD_KEY, config.get(JdbcUtils.PASSWORD_KEY).asText());
}

if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) {
configBuilder.put(JdbcUtils.JDBC_URL_PARAMS_KEY, config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText());
}
return Jsons.jsonNode(configBuilder.build());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.destination.teradata;

import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.base.AirbyteTraceMessageUtility;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import java.sql.*;
import java.sql.SQLException;
import java.sql.Timestamp;
import java.time.Instant;
import java.util.List;
import java.util.UUID;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TeradataSqlOperations extends JdbcSqlOperations {

private static final Logger LOGGER = LoggerFactory.getLogger(TeradataSqlOperations.class);

@Override
public void insertRecordsInternal(final JdbcDatabase database,
final List<AirbyteRecordMessage> records,
final String schemaName,
final String tmpTableName)
throws SQLException {
if (records.isEmpty()) {
return;
}
final String insertQueryComponent = String.format("INSERT INTO %s.%s (%s, %s, %s) VALUES", schemaName, tmpTableName,
JavaBaseConstants.COLUMN_NAME_AB_ID,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
final String recordQueryComponent = " (?, ?, ?)";

database.execute(con -> {
try {

PreparedStatement pstmt = con.prepareStatement(insertQueryComponent + recordQueryComponent);
for (final AirbyteRecordMessage record : records) {
final String uuid = UUID.randomUUID().toString();
final String jsonData = Jsons.serialize(formatData(record.getData()));
final Timestamp emittedAt = Timestamp.from(Instant.ofEpochMilli(record.getEmittedAt()));
pstmt.setString(1, uuid);
LOGGER.debug("uuid: " + uuid);
LOGGER.debug("jsonData: " + jsonData);
LOGGER.debug("jsonData: " + emittedAt);
pstmt.setString(2, jsonData);
pstmt.setTimestamp(3, emittedAt);
pstmt.addBatch();
}
int updateCounts[] = pstmt.executeBatch();
} catch (final SQLException e) {
AirbyteTraceMessageUtility.emitSystemErrorTrace(e,
"Connector failed while inserting records to staging table");
throw new RuntimeException(e);
} catch (final Exception e) {
AirbyteTraceMessageUtility.emitSystemErrorTrace(e,
"Connector failed while inserting records to staging table");
throw new RuntimeException(e);
} finally {

}
});
}

@Override
public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception {
try {
database.execute(String.format("CREATE DATABASE \"%s\" AS PERM = 1e9;", schemaName));
} catch (SQLException e) {
if (e.getMessage().contains("already exists")) {
LOGGER.warn("Database " + schemaName + " already exists.");
}
}

}

@Override
public void createTableIfNotExists(final JdbcDatabase database, final String schemaName, final String tableName)
throws SQLException {
try {
database.execute(createTableQuery(database, schemaName, tableName));
} catch (SQLException e) {
if (e.getMessage().contains("already exists")) {
LOGGER.warn("Table " + schemaName + "." + tableName + " already exists.");
}
}
}

@Override
public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) {
return String.format(
"CREATE SET TABLE %s.%s, FALLBACK ( \n" + "%s VARCHAR(256), \n" + "%s JSON, \n" + "%s TIMESTAMP(6) \n"
+ ");\n",
schemaName, tableName, JavaBaseConstants.COLUMN_NAME_AB_ID, JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_EMITTED_AT);
}

@Override
public void dropTableIfExists(final JdbcDatabase database, final String schemaName, final String tableName)
throws SQLException {
try {
database.execute(dropTableIfExistsQuery(schemaName, tableName));
} catch (SQLException e) {
AirbyteTraceMessageUtility.emitSystemErrorTrace(e,
"Connector failed while dropping table " + schemaName + "." + tableName);
}
}

@Override
public String truncateTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) {
try {
return String.format("DELETE %s.%s ALL;\n", schemaName, tableName);
} catch (Exception e) {
AirbyteTraceMessageUtility.emitSystemErrorTrace(e,
"Connector failed while truncating table " + schemaName + "." + tableName);
}
return "";
}

private String dropTableIfExistsQuery(final String schemaName, final String tableName) {
try {
return String.format("DROP TABLE %s.%s;\n", schemaName, tableName);
} catch (Exception e) {
AirbyteTraceMessageUtility.emitSystemErrorTrace(e,
"Connector failed while dropping table " + schemaName + "." + tableName);
}
return "";
}

@Override
public void executeTransaction(final JdbcDatabase database, final List<String> queries) throws Exception {
final StringBuilder appendedQueries = new StringBuilder();
try {
for (final String query : queries) {
appendedQueries.append(query);
}
database.execute(appendedQueries.toString());
} catch (SQLException e) {
AirbyteTraceMessageUtility.emitSystemErrorTrace(e,
"Connector failed while executing queries : " + appendedQueries.toString());
}
}

}
Loading