Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🎉 New Source: Teradata #24221

Merged
merged 28 commits into from
Apr 17, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
65d12be
add teradata template
itaseskii Feb 21, 2023
8541d6e
add teradata environment client
itaseskii Feb 23, 2023
ab648ae
Merge branch 'master' into teradata-source
itaseskii Mar 5, 2023
5d0b24d
add teradata source connector
itaseskii Mar 19, 2023
ba85d6b
Merge branch 'master' into teradata-source
itaseskii Mar 19, 2023
c3017ba
Merge branch 'master' into teradata-source
itaseskii Mar 19, 2023
f728b86
Merge branch 'teradata-source' of https://github.com/itaseskii/airbyt…
itaseskii Mar 19, 2023
9e8a62b
Merge branch 'master' into teradata-source
itaseskii Mar 25, 2023
32041bf
add docs
itaseskii Mar 27, 2023
e2cc30d
resolve conflicts
itaseskii Mar 27, 2023
232b069
fix lowercase letter
itaseskii Mar 27, 2023
6405d0a
Merge branch 'master' into teradata-source
itaseskii Mar 27, 2023
9e4f687
Merge branch 'master' into teradata-source
itaseskii Mar 27, 2023
a7d79dc
resolve conflicts
itaseskii Mar 28, 2023
461d5ac
Merge branch 'master' into teradata-source
itaseskii Mar 30, 2023
4faed61
fix compilation error & config test
itaseskii Mar 30, 2023
c71a0a4
Merge branch 'teradata-source' of https://github.com/itaseskii/airbyt…
itaseskii Mar 30, 2023
eee1a4b
fix timestamp
itaseskii Mar 30, 2023
a977c17
Merge branch 'master' into teradata-source
prateekmukhedkar Apr 3, 2023
4a11bef
Merge branch 'master' into teradata-source
itaseskii Apr 3, 2023
a545adc
Merge branch 'master' into teradata-source
itaseskii Apr 4, 2023
3de57fd
generate seed
itaseskii Apr 6, 2023
e13143e
Merge branch 'master' into teradata-source
prateekmukhedkar Apr 6, 2023
c3316d6
resolve conflicts
itaseskii Apr 10, 2023
524a30d
Merge branch 'teradata-source' of https://github.com/itaseskii/airbyt…
itaseskii Apr 10, 2023
347976a
Merge branch 'master' into teradata-source
itaseskii Apr 10, 2023
c2dbe7d
Merge branch 'master' into teradata-source
itaseskii Apr 10, 2023
6dcb7e3
Merge branch 'master' into teradata-source
prateekmukhedkar Apr 13, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,8 @@ protected AirbyteCatalog filterOutOtherSchemas(final AirbyteCatalog catalog) {
@Test
void testDiscoverWithMultipleSchemas() throws Exception {
// clickhouse and mysql do not have a concept of schemas, so this test does not make sense for them.
if (getDriverClass().toLowerCase().contains("mysql") || getDriverClass().toLowerCase().contains("clickhouse")) {
String driverClass = getDriverClass().toLowerCase();
if (driverClass.contains("mysql") || driverClass.contains("clickhouse") || driverClass.contains("teradata")) {
return;
}

Expand Down Expand Up @@ -836,10 +837,11 @@ protected void incrementalCursorCheck(

// See https://github.com/airbytehq/airbyte/issues/14732 for rationale and details.
@Test
void testIncrementalWithConcurrentInsertion() throws Exception {
public void testIncrementalWithConcurrentInsertion() throws Exception {
final String driverName = getDriverClass().toLowerCase();
final String namespace = getDefaultNamespace();
final String fullyQualifiedTableName = getFullyQualifiedTableName(TABLE_NAME_AND_TIMESTAMP);
final String columnDefinition = String.format("name VARCHAR(200) NOT NULL, timestamp %s NOT NULL", COL_TIMESTAMP_TYPE);
final String columnDefinition = String.format("name VARCHAR(200) NOT NULL, %s %s NOT NULL", COL_TIMESTAMP, COL_TIMESTAMP_TYPE);

// 1st sync
database.execute(ctx -> {
Expand Down Expand Up @@ -877,7 +879,12 @@ void testIncrementalWithConcurrentInsertion() throws Exception {
.filter(r -> r.getType() == Type.RECORD)
.map(r -> r.getRecord().getData().get(COL_NAME).asText())
.toList();
assertEquals(List.of("a", "b"), firstSyncNames);
// teradata doesn't make insertion order guarantee when equal ordering value
if (driverName.contains("teradata")) {
assertThat(List.of("a", "b"), Matchers.containsInAnyOrder(firstSyncNames.toArray()));
} else {
assertEquals(List.of("a", "b"), firstSyncNames);
}

// 2nd sync
database.execute(ctx -> {
Expand Down Expand Up @@ -927,7 +934,14 @@ void testIncrementalWithConcurrentInsertion() throws Exception {
.filter(r -> r.getType() == Type.RECORD)
.map(r -> r.getRecord().getData().get(COL_NAME).asText())
.toList();
assertEquals(List.of("c", "d", "e", "f"), thirdSyncExpectedNames);

// teradata doesn't make insertion order guarantee when equal ordering value
if (driverName.contains("teradata")) {
assertThat(List.of("c", "d", "e", "f"), Matchers.containsInAnyOrder(thirdSyncExpectedNames.toArray()));
} else {
assertEquals(List.of("c", "d", "e", "f"), thirdSyncExpectedNames);
}

}

private JsonNode getStateData(final AirbyteMessage airbyteMessage, final String streamName) {
Expand Down Expand Up @@ -1158,7 +1172,8 @@ private String getDefaultSchemaName() {

protected String getDefaultNamespace() {
// mysql does not support schemas. it namespaces using database names instead.
if (getDriverClass().toLowerCase().contains("mysql") || getDriverClass().toLowerCase().contains("clickhouse")) {
if (getDriverClass().toLowerCase().contains("mysql") || getDriverClass().toLowerCase().contains("clickhouse") ||
getDriverClass().toLowerCase().contains("teradata")) {
return config.get(JdbcUtils.DATABASE_KEY).asText();
} else {
return SCHEMA_NAME;
Expand Down
21 changes: 21 additions & 0 deletions airbyte-integrations/connectors/source-teradata/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
FROM airbyte/integration-base-java:dev AS build

WORKDIR /airbyte

ENV APPLICATION source-teradata

COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar

RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar

FROM airbyte/integration-base-java:dev

WORKDIR /airbyte

ENV APPLICATION source-teradata

COPY --from=build /airbyte /airbyte

# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile.
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-teradata
69 changes: 69 additions & 0 deletions airbyte-integrations/connectors/source-teradata/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
# Source Teradata

This is the repository for the Teradata source connector in Java.
For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.com/integrations/sources/teradata).

## Local development

#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:source-teradata:build
```

#### Create credentials
**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`.
Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information.

**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.com/connector-development#using-credentials-in-ci) to set up the credentials.

### Locally running the connector docker image

#### Build
Build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:source-teradata:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.

#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/source-teradata:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-teradata:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-teradata:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-teradata:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```

## Testing
We use `JUnit` for Java tests.

### Unit and Integration Tests
Place unit tests under `src/test/...`
Place integration tests in `src/test-integration/...`

#### Acceptance Tests
Airbyte has a standard test suite that all source connectors must pass. Implement the `TODO`s in
`src/test-integration/java/io/airbyte/integrations/sources/teradataSourceAcceptanceTest.java`.
grishick marked this conversation as resolved.
Show resolved Hide resolved

### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:source-teradata:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:source-teradata:integrationTest
```

## Dependency Management

### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-teradata:dev
acceptance_tests:
spec:
tests:
- spec_path: "src/test-integration/resources/expected_spec.json"
config_path: "src/test-integration/resources/dummy_config.json"
itaseskii marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#!/usr/bin/env sh

# Build latest connector image
docker build . -t $(cat acceptance-test-config.yml | grep "connector_image" | head -n 1 | cut -d: -f2-)

# Pull latest acctest image
docker pull airbyte/connector-acceptance-test:latest

# Run
docker run --rm -it \
-v /var/run/docker.sock:/var/run/docker.sock \
-v /tmp:/tmp \
-v $(pwd):/test_input \
airbyte/connector-acceptance-test \
--acceptance-test-config /test_input
30 changes: 30 additions & 0 deletions airbyte-integrations/connectors/source-teradata/build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
id 'airbyte-connector-acceptance-test'
}

application {
mainClass = 'io.airbyte.integrations.source.teradata.TeradataSource'
}

dependencies {
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation libs.airbyte.protocol
implementation project(':airbyte-integrations:connectors:source-jdbc')
implementation project(':airbyte-integrations:connectors:source-relational-db')

implementation 'com.teradata.jdbc:terajdbc:20.00.00.06'

testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))

testImplementation 'org.apache.commons:commons-lang3:3.11'

integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-teradata')
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')

implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#


import pytest

pytest_plugins = ("connector_acceptance_test.plugin",)


@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""This fixture is a placeholder for external resources that acceptance test might require."""
# TODO: setup test dependencies if needed. otherwise remove the TODO comments
yield
# TODO: clean up test dependencies
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.teradata;

import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.relationaldb.TableInfo;
import io.airbyte.protocol.models.CommonField;
import java.io.IOException;
import java.io.PrintWriter;
import java.io.UncheckedIOException;
import java.nio.charset.StandardCharsets;
import java.sql.JDBCType;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class TeradataSource extends AbstractJdbcSource<JDBCType> implements Source {

private static final Logger LOGGER = LoggerFactory.getLogger(TeradataSource.class);

private static final int INTERMEDIATE_STATE_EMISSION_FREQUENCY = 10_000;

static final String DRIVER_CLASS = "com.teradata.jdbc.TeraDriver";

public static final String PARAM_MODE = "mode";
public static final String PARAM_SSL = "ssl";
public static final String PARAM_SSL_MODE = "ssl_mode";
public static final String PARAM_SSLMODE = "sslmode";
public static final String PARAM_SSLCA = "sslca";
public static final String REQUIRE = "require";

private static final String CA_CERTIFICATE = "ca.pem";

public TeradataSource() {
super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new TeradataSourceOperations());
}

public static void main(final String[] args) throws Exception {
final Source source = new TeradataSource();
LOGGER.info("starting source: {}", TeradataSource.class);
new IntegrationRunner(source).run(args);
LOGGER.info("completed source: {}", TeradataSource.class);
}

@Override
public JsonNode toDatabaseConfig(final JsonNode config) {
final String schema = config.get(JdbcUtils.DATABASE_KEY).asText();

final String host = config.has(JdbcUtils.PORT_KEY) ?
config.get(JdbcUtils.HOST_KEY).asText() + ":" + config.get(JdbcUtils.PORT_KEY).asInt() :
config.get(JdbcUtils.HOST_KEY).asText();

final String jdbcUrl = String.format("jdbc:teradata://%s/", host);

final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put(JdbcUtils.USERNAME_KEY, config.get(JdbcUtils.USERNAME_KEY).asText())
.put(JdbcUtils.JDBC_URL_KEY, jdbcUrl)
.put(JdbcUtils.SCHEMA_KEY, schema);

if (config.has(JdbcUtils.PASSWORD_KEY)) {
configBuilder.put(JdbcUtils.PASSWORD_KEY, config.get(JdbcUtils.PASSWORD_KEY).asText());
}

if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) {
configBuilder.put(JdbcUtils.JDBC_URL_PARAMS_KEY, config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText());
}

return Jsons.jsonNode(configBuilder.build());
}

@Override
public Set<String> getExcludedInternalNameSpaces() {
// the connector requires to have a database explicitly defined
return Set.of("");
}

@Override
protected int getStateEmissionFrequency() {
return INTERMEDIATE_STATE_EMISSION_FREQUENCY;
}

@Override
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(JdbcDatabase database) throws Exception {
return discoverInternal(database, database.getSourceConfig().has(JdbcUtils.DATABASE_KEY) ?
database.getSourceConfig().get(JdbcUtils.DATABASE_KEY).asText() : null);
}

@Override
protected Map<String, String> getDefaultConnectionProperties(JsonNode config) {
itaseskii marked this conversation as resolved.
Show resolved Hide resolved
final Map<String, String> additionalParameters = new HashMap<>();
if (config.has(PARAM_SSL) && config.get(PARAM_SSL).asBoolean()) {
LOGGER.debug("SSL Enabled");
if (config.has(PARAM_SSL_MODE)) {
LOGGER.debug("Selected SSL Mode : {}", config.get(PARAM_SSL_MODE).get(PARAM_MODE).asText());
additionalParameters.putAll(obtainConnectionOptions(config.get(PARAM_SSL_MODE)));
} else {
additionalParameters.put(PARAM_SSLMODE, REQUIRE);
}
}
return additionalParameters;
}

private Map<String, String> obtainConnectionOptions(final JsonNode encryption) {
final Map<String, String> additionalParameters = new HashMap<>();
if (!encryption.isNull()) {
final var method = encryption.get(PARAM_MODE).asText();
switch (method) {
case "verify-ca", "verify-full" -> {
additionalParameters.put(PARAM_SSLMODE, method);
try {
createCertificateFile(CA_CERTIFICATE, encryption.get("ssl_ca_certificate").asText());
} catch (final IOException ioe) {
throw new UncheckedIOException(ioe);
}
additionalParameters.put(PARAM_SSLCA, CA_CERTIFICATE);
}
default -> additionalParameters.put(PARAM_SSLMODE, method);
}
}
return additionalParameters;
}

private static void createCertificateFile(String fileName, String fileValue) throws IOException {
try (final PrintWriter out = new PrintWriter(fileName, StandardCharsets.UTF_8)) {
out.print(fileValue);
}
}


}
Loading