Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Hive JDBC Source] Support Hive JDBC Source Connector #5424

Merged
merged 78 commits into from
Nov 7, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
78 commits
Select commit Hold shift + click to select a range
2902241
:sparkles: "Added Hive support to the JDBC connector
Sep 4, 2023
55f9a07
:sparkles: "Added Hive support to the JDBC connector
Sep 4, 2023
5072173
:art: Refactor HiveTypeMapper for JDBC connector
Sep 4, 2023
255cbbc
:sparkles: Add Hive JDBC source connector documentation
Sep 6, 2023
4828e7a
:sparkles: Add JDBC Hive IT and source config to SeaTunnel project
Sep 6, 2023
d4ddd38
:fire: Refactored JDBC configuration filename in e2e-test
Sep 6, 2023
ab7125b
:art: Add BYTE_TYPE mapping in HiveTypeMapper
Sep 8, 2023
6bd08a0
Merge remote-tracking branch 'origin1/dev' into hiveJdbc
Sep 14, 2023
7bc4992
:art: Refactor JDBC connector tests for Hive
Sep 14, 2023
62d4ecf
:art: Update Hive Image and Add Environment Variable
Sep 15, 2023
4d31e34
:art: Refactor styling in JdbcHiveIT file
Sep 15, 2023
069dee2
Added auto_commit to jdbc_hive config file
Sep 15, 2023
008eb26
Update hive.jdbc version and remove 'provided' scope
Sep 15, 2023
f37c375
Merge branch 'apache:dev' into hivejdbc
NickCodeJourney Sep 17, 2023
0b3a237
Merge branch 'apache:dev' into hivejdbc
NickCodeJourney Sep 18, 2023
6c6f15d
Merge branch 'apache:dev' into hivejdbc
NickCodeJourney Sep 25, 2023
15428f6
Merge branch 'apache:dev' into hivejdbc
NickCodeJourney Sep 26, 2023
27bd001
:sparkles: "Added kerberos support in JDBC connections
Sep 26, 2023
b146025
:sparkles: "Added kerberos support in JDBC connections
Sep 26, 2023
26faef0
Merge remote-tracking branch 'origin/hivejdbc' into hivejdbc-1
Sep 26, 2023
1797d83
Add ASF license header to HiveJdbcConnectionProvider file
Sep 26, 2023
147bb80
revert: "Add Kingbase8 dependency to JDBC connector
Sep 26, 2023
0c236f2
:art: Refactor HiveJdbcConnectionProvider and SimpleJdbcConnectionPro…
Sep 26, 2023
01e7aad
:art: Refactor HiveJdbcConnectionProvider and SimpleJdbcConnectionPro…
Sep 26, 2023
17a8fb6
解决冲突
Oct 8, 2023
a1d1eff
解决冲突
Oct 8, 2023
52244c5
解决冲突
Oct 8, 2023
dcf6148
revert
Oct 8, 2023
d2886a0
Merge branch 'apache:dev' into hivejdbc
NickCodeJourney Oct 8, 2023
d553e24
revert
Oct 8, 2023
8c6f1b4
Merge branch 'apache:dev' into hivejdbc
NickCodeJourney Oct 10, 2023
44507bf
:art:
Oct 12, 2023
d44cfd8
Merge remote-tracking branch 'origin/hivejdbc' into hivejdbc-1
Oct 12, 2023
8a9fe61
:art: Refactor kerberos authentication for HiveJdbcConnection
Oct 12, 2023
2e89d3d
Added "provided" scope to hive-service dependency in pom.xml
Oct 19, 2023
e49e08d
:sparkles: Add hive-service JAR to JDBC test setup
Oct 19, 2023
28bcebd
:art: format
Oct 19, 2023
d268f8b
:art: format
Oct 19, 2023
14ead5e
:art: format
Oct 19, 2023
4dd0dc3
Add Hive dependencies to pom.xml
Oct 20, 2023
0079e49
Added Apache Hive JDBC and Hive Service dependencies to the assembly-…
Oct 20, 2023
be99e25
Update Hive JDBC driver URL in JdbcHiveIT test
Oct 20, 2023
077065f
Update Hive JDBC driver URL in JdbcHiveIT test
Oct 20, 2023
355d63f
Update Hive JDBC driver URL in JdbcHiveIT test
Oct 20, 2023
24ef7eb
Update Hive JDBC driver URL in JdbcHiveIT test
Oct 20, 2023
9b04b1f
Update Hive JDBC driver URL in JdbcHiveIT test
Oct 20, 2023
05331c8
Update Hive JDBC driver URL in JdbcHiveIT test
Oct 20, 2023
3c5b455
Update Hive JDBC driver URL in JdbcHiveIT test
Oct 20, 2023
be7b208
Update Hive JDBC driver URL in JdbcHiveIT test
Oct 20, 2023
347b5d9
revert
Oct 20, 2023
53d774c
revert
Oct 20, 2023
7f39cf7
revert
Oct 20, 2023
d92733b
"Remove unnecessary comment in HiveTypeMapper"
Oct 20, 2023
8f788e9
Merge branch 'dev' into hivejdbc
NickCodeJourney Oct 23, 2023
f9850aa
:art: format
Oct 23, 2023
7f7a05f
Merge branch 'apache:dev' into hivejdbc
NickCodeJourney Oct 24, 2023
377d0ac
Merge branch 'dev' into hivejdbc
NickCodeJourney Oct 26, 2023
8725842
resolve conflict
Oct 26, 2023
85fc395
Merge branch 'apache:dev' into hivejdbc
NickCodeJourney Oct 26, 2023
bbd2b3a
:art: resolve conflict
Oct 26, 2023
0bbbbbd
Merge remote-tracking branch 'origin/hivejdbc' into hivejdbc
Oct 26, 2023
e300ea0
add license header
Oct 26, 2023
daec9a5
:fire:
Oct 26, 2023
ecc0ae8
:fire:
Oct 26, 2023
cd396e4
format
Oct 26, 2023
5f0ca3a
Add Hive JDBC dependency and update SimpleJdbcConnectionProvider
Oct 26, 2023
b78c44a
Add Hive service as dependency in pom.xml
Oct 26, 2023
092c0b5
Remove hive-service dependency from pom.xml
Oct 26, 2023
6dc9221
Merge branch 'apache:dev' into hivejdbc
NickCodeJourney Oct 27, 2023
23eaedb
Merge branch 'dev' into hivejdbc
NickCodeJourney Oct 27, 2023
cc1c38f
Merge branch 'apache:dev' into hivejdbc
NickCodeJourney Oct 27, 2023
c601e84
Update dependency in connector-jdbc pom.xml
NickCodeJourney Oct 27, 2023
1db6ce0
Add new error codes to Error Quick Reference Manual
NickCodeJourney Oct 30, 2023
4e00b74
Update default krb5 path in jdbc configs
NickCodeJourney Oct 30, 2023
a31931a
Merge branch 'dev' into hivejdbc
NickCodeJourney Oct 30, 2023
6724258
Add Hive version support information to docs
NickCodeJourney Oct 31, 2023
b8d57b2
Merge remote-tracking branch 'origin/hivejdbc' into hiveJdbc
NickCodeJourney Oct 31, 2023
027b58d
Merge branch 'apache:dev' into hivejdbc
NickCodeJourney Oct 31, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 156 additions & 0 deletions docs/en/connector-v2/source/Hive-jdbc.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
# Hive

> JDBC Hive Source Connector

## Support Those Engines

> Spark<br/>
> Flink<br/>
> SeaTunnel Zeta<br/>
NickCodeJourney marked this conversation as resolved.
Show resolved Hide resolved

## Key Features

- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [ ] [exactly-once](../../concept/connector-v2-features.md)
- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [x] [support user-defined split](../../concept/connector-v2-features.md)

> supports query SQL and can achieve projection effect.

## Description

Read external data source data through JDBC.

## Supported DataSource Info

| Datasource | Supported versions | Driver | Url | Maven |
|------------|----------------------------------------------------------|---------------------------------|--------------------------------------|--------------------------------------------------------------------------|
| Hive | Different dependency version has different driver class. | org.apache.hive.jdbc.HiveDriver | jdbc:hive2://localhost:10000/default | [Download](https://mvnrepository.com/artifact/org.apache.hive/hive-jdbc) |

## Database Dependency

> Please download the support list corresponding to 'Maven' and copy it to the '$SEATNUNNEL_HOME/plugins/jdbc/lib/'
> working directory<br/>
> For example Hive datasource: cp hive-jdbc-xxx.jar $SEATNUNNEL_HOME/plugins/jdbc/lib/

## Data Type Mapping

| Hive Data type | SeaTunnel Data type |
|-------------------------------------------------------------------------------------------|---------------------|
| BOOLEAN | BOOLEAN |
| TINYINT<br/> SMALLINT | SHORT |
| INT<br/>INTEGER | INT |
| BIGINT | LONG |
| FLOAT | FLOAT |
| DOUBLE<br/>DOUBLE PRECISION | DOUBLE |
| DECIMAL(x,y)<br/>NUMERIC(x,y)<br/>(Get the designated column's specified column size.<38) | DECIMAL(x,y) |
| DECIMAL(x,y)<br/>NUMERIC(x,y)<br/>(Get the designated column's specified column size.>38) | DECIMAL(38,18) |
| CHAR<br/>VARCHAR<br/>STRING | STRING |
| DATE | DATE |
| DATETIME<br/>TIMESTAMP | TIMESTAMP |
| BINARY<br/> ARRAY <br/>INTERVAL <br/>MAP <br/>STRUCT<br/>UNIONTYPE | Not supported yet |

## Source Options

| Name | Type | Required | Default | Description |
|------------------------------|------------|----------|-----------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| url | String | Yes | - | The URL of the JDBC connection. Refer to a case: jdbc:hive2://localhost:10000/default |
| driver | String | Yes | - | The jdbc class name used to connect to the remote data source,<br/> if you use Hive the value is `org.apache.hive.jdbc.HiveDriver`. |
| user | String | No | - | Connection instance user name |
| password | String | No | - | Connection instance password |
| query | String | Yes | - | Query statement |
| connection_check_timeout_sec | Int | No | 30 | The time in seconds to wait for the database operation used to validate the connection to complete |
| partition_column | String | No | - | The column name for parallelism's partition, only support numeric type,Only support numeric type primary key, and only can config one column. |
| partition_lower_bound | BigDecimal | No | - | The partition_column min value for scan, if not set SeaTunnel will query database get min value. |
| partition_upper_bound | BigDecimal | No | - | The partition_column max value for scan, if not set SeaTunnel will query database get max value. |
| partition_num | Int | No | job parallelism | The number of partition count, only support positive integer. default value is job parallelism |
| fetch_size | Int | No | 0 | For queries that return a large number of objects,you can configure<br/> the row fetch size used in the query toimprove performance by<br/> reducing the number database hits required to satisfy the selection criteria.<br/> Zero means use jdbc default value. |
| common-options | | No | - | Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details |

### Tips

> If partition_column is not set, it will run in single concurrency, and if partition_column is set, it will be executed
> in parallel according to the concurrency of tasks , When your shard read field is a large number type such as bigint(
> and above and the data is not evenly distributed, it is recommended to set the parallelism level to 1 to ensure that
> the
> data skew problem is resolved

## Task Example

### Simple:

> This example queries type_bin 'table' 16 data in your test "database" in single parallel and queries all of its
> fields. You can also specify which fields to query for final output to the console.

```
# Defining the runtime environment
env {
# You can set flink configuration here
execution.parallelism = 2
job.mode = "BATCH"
}
source{
Jdbc {
url = "jdbc:hive2://localhost:10000/default"
driver = "org.apache.hive.jdbc.HiveDriver"
connection_check_timeout_sec = 100
query = "select * from type_bin limit 16"
}
}

transform {
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform-v2/sql
}

sink {
Console {}
}
```

### Parallel:

> Read your query table in parallel with the shard field you configured and the shard data You can do this if you want
> to read the whole table

```
source {
Jdbc {
url = "jdbc:hive2://localhost:10000/default"
driver = "org.apache.hive.jdbc.HiveDriver"
connection_check_timeout_sec = 100
# Define query logic as required
query = "select * from type_bin"
# Parallel sharding reads fields
partition_column = "id"
# Number of fragments
partition_num = 10
}
}
```

### Parallel Boundary:

> It is more efficient to specify the data within the upper and lower bounds of the query It is more efficient to read
> your data source according to the upper and lower boundaries you configured

```
source {
Jdbc {
url = "jdbc:hive2://localhost:10000/default"
driver = "org.apache.hive.jdbc.HiveDriver"
connection_check_timeout_sec = 100
# Define query logic as required
query = "select * from type_bin"
partition_column = "id"
# Read start boundary
partition_lower_bound = 1
# Read end boundary
partition_upper_bound = 500
partition_num = 10
}
}
```

11 changes: 11 additions & 0 deletions seatunnel-connectors-v2/connector-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
<snowflake.version>3.13.29</snowflake.version>
<vertica.version>12.0.3-0</vertica.version>
<postgis.jdbc.version>2.5.1</postgis.jdbc.version>
<hive.jdbc.version>3.1.2</hive.jdbc.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -143,6 +144,12 @@
<version>${vertica.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>${hive.jdbc.version}</version>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

${hive.version}

<scope>provided</scope>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down Expand Up @@ -218,5 +225,9 @@
<groupId>com.vertica.jdbc</groupId>
<artifactId>vertica-jdbc</artifactId>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ public enum JdbcConnectorErrorCode implements SeaTunnelErrorCode {
CONNECT_DATABASE_FAILED("JDBC-04", "Connector database failed"),
TRANSACTION_OPERATION_FAILED(
"JDBC-05", "transaction operation failed, such as (commit, rollback) etc.."),
NO_SUITABLE_DIALECT_FACTORY("JDBC-06", "No suitable dialect factory found");

NO_SUITABLE_DIALECT_FACTORY("JDBC-06", "No suitable dialect factory found"),
DONT_SUPPORT_SINK("JDBC-07", "The jdbc type don't support sink");
private final String code;

private final String description;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;

import org.apache.seatunnel.connectors.seatunnel.jdbc.config.JdbcSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.JdbcRowConverter;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectTypeMapper;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.Optional;

public class HiveDialect implements JdbcDialect {

@Override
public String dialectName() {
return "HIVE";
}

@Override
public JdbcRowConverter getRowConverter() {
return new HiveJdbcRowConverter();
}

@Override
public JdbcDialectTypeMapper getJdbcDialectTypeMapper() {
return new HiveTypeMapper();
}

@Override
public Optional<String> getUpsertStatement(
String database, String tableName, String[] fieldNames, String[] uniqueKeyFields) {
return Optional.empty();
}

@Override
public ResultSetMetaData getResultSetMetaData(
Connection conn, JdbcSourceConfig jdbcSourceConfig) throws SQLException {
PreparedStatement ps = conn.prepareStatement(jdbcSourceConfig.getQuery());
return ps.executeQuery().getMetaData();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;

import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialect;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory;

import com.google.auto.service.AutoService;

/** Factory for {@link HiveDialect}. */
@AutoService(JdbcDialectFactory.class)
public class HiveDialectFactory implements JdbcDialectFactory {

@Override
public boolean acceptsURL(String url) {
return url.startsWith("jdbc:hive2:");
}

@Override
public JdbcDialect create() {
return new HiveDialect();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.hive;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException;
import org.apache.seatunnel.connectors.seatunnel.jdbc.internal.converter.AbstractJdbcRowConverter;

import java.sql.PreparedStatement;

public class HiveJdbcRowConverter extends AbstractJdbcRowConverter {

@Override
public String converterName() {
return "Hive";
}

@Override
public PreparedStatement toExternal(
SeaTunnelRowType rowType, SeaTunnelRow row, PreparedStatement statement) {
throw new JdbcConnectorException(
JdbcConnectorErrorCode.DONT_SUPPORT_SINK,
"The Hive jdbc connector don't support sink");
}
}
Loading
Loading