Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
yangbinbin committed Nov 13, 2022
1 parent cebb2a7 commit 4b57737
Show file tree
Hide file tree
Showing 8 changed files with 394 additions and 1 deletion.
20 changes: 19 additions & 1 deletion seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,14 @@

<artifactId>connector-jdbc</artifactId>

<repositories>
<repository>
<id>central-repos1</id>
<name>Central Repository 2</name>
<url>https://repo1.maven.org/maven2/</url>
</repository>
</repositories>

<properties>
<mysql.version>8.0.16</mysql.version>
<postgresql.version>42.4.1</postgresql.version>
Expand All @@ -37,6 +45,7 @@
<phoenix.version>5.2.5-HBase-2.x</phoenix.version>
<oracle.version>12.2.0.1</oracle.version>
<db2.version>db2jcc4</db2.version>
<teradata.version>17.20.00.12</teradata.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -83,7 +92,12 @@
<version>${db2.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.teradata.jdbc</groupId>
<artifactId>terajdbc4</artifactId>
<version>${teradata.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -119,6 +133,10 @@
<groupId>com.ibm.db2.jcc</groupId>
<artifactId>db2jcc</artifactId>
</dependency>
<dependency>
<groupId>com.teradata.jdbc</groupId>
<artifactId>terajdbc4</artifactId>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

public class TeradataDialect implements JdbcDialect {

@Override
public String dialectName() {
return "Teradata";
}

@Override
public JdbcRowConverter getRowConverter() {
return new TeradataJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new TeradataTypeMapper();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;

@AutoService(JdbcDialectFactory.class)
public class TeradataDialectFactory implements JdbcDialectFactory {

@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:teradata:");
}

@Override
public JdbcDialect create() {
return new TeradataDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class TeradataJdbcRowConverter extends AbstractJdbcRowConverter {

@Override
public String converterName() {
return "Teradata";
}

@Override
public SeaTunnelRow toInternal(ResultSet rs, ResultSetMetaData metaData, SeaTunnelRowType typeInfo) throws SQLException {
return super.toInternal(rs, metaData, typeInfo);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata;

import org.apache.seatunnel.api.table.type.ArrayType;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.DecimalType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import java.sql.ResultSetMetaData;
import java.sql.SQLException;

public class TeradataTypeMapper implements JdbcDialectTypeMapper {

// ============================data types=====================

// -------------------------number----------------------------
private static final String TERADATA_BYTEINT = "BYTEINT";
private static final String TERADATA_SMALLINT = "SMALLINT";
private static final String TERADATA_INTEGER = "INTEGER";
private static final String TERADATA_BIGINT = "BIGINT";
private static final String TERADATA_FLOAT = "FLOAT";
private static final String TERADATA_DECIMAL = "DECIMAL";

// -------------------------string----------------------------
private static final String TERADATA_CHAR = "CHAR";
private static final String TERADATA_VARCHAR = "VARCHAR";
private static final String TERADATA_CLOB = "CLOB";


// ---------------------------binary---------------------------
private static final String TERADATA_BYTE = "BYTE";
private static final String TERADATA_VARBYTE = "VARBYTE";

// ------------------------------time-------------------------
private static final String TERADATA_DATE = "DATE";
private static final String TERADATA_TIME = "TIME";
private static final String TERADATA_TIMESTAMP = "TIMESTAMP";

// ------------------------------blob-------------------------
private static final String TERADATA_BLOB = "BLOB";

@Override
public SeaTunnelDataType<?> mapping(ResultSetMetaData metadata, int colIndex) throws SQLException {
String teradataType = metadata.getColumnTypeName(colIndex).toUpperCase();
switch (teradataType) {
case TERADATA_BYTEINT:
return BasicType.BYTE_TYPE;
case TERADATA_SMALLINT:
return BasicType.SHORT_TYPE;
case TERADATA_INTEGER:
return BasicType.INT_TYPE;
case TERADATA_BIGINT:
return BasicType.LONG_TYPE;
case TERADATA_FLOAT:
return BasicType.FLOAT_TYPE;
case TERADATA_DECIMAL:
return new DecimalType(metadata.getPrecision(colIndex), metadata.getScale(colIndex));
case TERADATA_CHAR:
case TERADATA_VARCHAR:
case TERADATA_CLOB:
return PrimitiveByteArrayType.INSTANCE;
case TERADATA_BYTE:
case TERADATA_VARBYTE:
case TERADATA_BLOB:
return ArrayType.BYTE_ARRAY_TYPE;
case TERADATA_DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
case TERADATA_TIME:
return LocalTimeType.LOCAL_TIME_TYPE;
case TERADATA_TIMESTAMP:
return LocalTimeType.LOCAL_DATE_TIME_TYPE;
default:
final String jdbcColumnName = metadata.getColumnName(colIndex);
throw new UnsupportedOperationException(
String.format(
"Doesn't support TERADATA type '%s' on column '%s' yet.",
teradataType, jdbcColumnName));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,11 @@
<artifactId>db2jcc</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.teradata.jdbc</groupId>
<artifactId>terajdbc4</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc;

import org.apache.seatunnel.e2e.common.TestResource;
import org.apache.seatunnel.e2e.common.TestSuiteBase;
import org.apache.seatunnel.e2e.common.container.TestContainer;

import com.teradata.jdbc.TeraDataSource;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;

import java.sql.Connection;
import java.sql.Statement;

@Disabled("Disabled because it needs user's personal teradata account to run this test")
public class JdbcTeradataIT extends TestSuiteBase implements TestResource {
private static final String HOST = "192.168.109.128";
private static final String PORT = "1025";
private static final String USERNAME = "dbc";
private static final String PASSWORD = "dbc";
private static final String DATABASE = "test";
private static final String SINK_TABLE = "sink_table";
private Connection connection;

@TestTemplate
public void testTeradata(TestContainer container) throws Exception {
Container.ExecResult execResult = container.executeJob("/jdbc_teradata_source_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
clearSinkTable();
}

private void clearSinkTable() {
try (Statement statement = connection.createStatement()) {
statement.execute(String.format("delete from %s", SINK_TABLE));
} catch (Exception e) {
throw new RuntimeException("Test teradata server failed!", e);
}
}

@BeforeAll
@Override
public void startUp() throws Exception {
TeraDataSource teraDataSource = new TeraDataSource();
teraDataSource.setDSName(HOST);
teraDataSource.setDbsPort(PORT);
teraDataSource.setUser(USERNAME);
teraDataSource.setPassword(PASSWORD);
teraDataSource.setDATABASE(DATABASE);
this.connection = teraDataSource.getConnection();
}

@AfterAll
@Override
public void tearDown() throws Exception {
if (connection != null) {
this.connection.close();
}
}
}
Loading

0 comments on commit 4b57737

Please sign in to comment.