Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][jdbc splitpk] support manually set the Min/Max value #1541

Open
wants to merge 1 commit into
base: 1.12_release
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ public class JdbcConf extends ChunJunCommonConf implements Serializable {
protected String orderByColumn;
protected String querySql;
protected String splitPk;
protected String splitPkStart;
protected String splitPkEnd;
protected String splitStrategy;
protected int fetchSize = 0;
protected int queryTimeOut = 0;
Expand Down Expand Up @@ -106,6 +108,10 @@ public class JdbcConf extends ChunJunCommonConf implements Serializable {
/** upsert 写数据库时,是否null覆盖原来的值 */
protected boolean allReplace = false;

protected boolean isAutoCommit = false;

private boolean defineColumnTypeForStatement = false;

public Boolean getInitReporter() {
return initReporter;
}
Expand Down Expand Up @@ -137,6 +143,22 @@ public String getTable() {
return connection.get(0).getTable().get(0);
}

public String getSplitPkStart() {
return splitPkStart;
}

public void setSplitPkStart(String splitPkStart) {
this.splitPkStart = splitPkStart;
}

public String getSplitPkEnd() {
return splitPkEnd;
}

public void setSplitPkEnd(String splitPkEnd) {
this.splitPkEnd = splitPkEnd;
}

public void setTable(String table) {
connection.get(0).getTable().set(0, table);
}
Expand Down Expand Up @@ -419,6 +441,14 @@ public void setAllReplace(boolean allReplace) {
this.allReplace = allReplace;
}

public boolean isAutoCommit() {
return isAutoCommit;
}

public boolean isDefineColumnTypeForStatement() {
return defineColumnTypeForStatement;
}

public String getSplitStrategy() {
return splitStrategy;
}
Expand Down Expand Up @@ -472,6 +502,12 @@ public String toString() {
+ ", splitPk='"
+ splitPk
+ '\''
+ ", splitPkStart='"
+ splitPkStart
+ '\''
+ ", splitPkEnd='"
+ splitPkEnd
+ '\''
+ ", splitStrategy='"
+ splitStrategy
+ '\''
Expand All @@ -485,9 +521,13 @@ public String toString() {
+ increment
+ ", polling="
+ polling
+ ", pollingFromMax="
+ pollingFromMax
+ ", increColumn='"
+ increColumn
+ '\''
+ ", isOrderBy="
+ isOrderBy
+ ", increColumnIndex="
+ increColumnIndex
+ ", increColumnType='"
Expand All @@ -508,6 +548,8 @@ public String toString() {
+ restoreColumnIndex
+ ", useMaxFunc="
+ useMaxFunc
+ ", initReporter="
+ initReporter
+ ", mode='"
+ mode
+ '\''
Expand All @@ -521,6 +563,10 @@ public String toString() {
+ updateKey
+ ", allReplace="
+ allReplace
+ ", isAutoCommit="
+ isAutoCommit
+ ", defineColumnTypeForStatement="
+ defineColumnTypeForStatement
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -393,25 +393,32 @@ private Pair<String, String> getSplitRangeFromDb() {
try {
long startTime = System.currentTimeMillis();

String querySplitRangeSql = SqlUtil.buildQuerySplitRangeSql(jdbcConf, jdbcDialect);
LOG.info(String.format("Query SplitRange sql is '%s'", querySplitRangeSql));

conn = getConnection();
st = conn.createStatement(resultSetType, resultSetConcurrency);
st.setQueryTimeout(jdbcConf.getQueryTimeOut());
rs = st.executeQuery(querySplitRangeSql);
if (rs.next()) {
if (jdbcConf.getSplitPkStart() != null && jdbcConf.getSplitPkEnd() != null) {
splitPkRange =
Pair.of(
String.valueOf(rs.getObject("min_value")),
String.valueOf(rs.getObject("max_value")));
}
String.valueOf(jdbcConf.getSplitPkStart()),
String.valueOf(jdbcConf.getSplitPkEnd()));

LOG.info(
String.format(
"Takes [%s] milliseconds to get the SplitRange value [%s]",
System.currentTimeMillis() - startTime, splitPkRange));
} else {
String querySplitRangeSql = SqlUtil.buildQuerySplitRangeSql(jdbcConf, jdbcDialect);
LOG.info(String.format("Query SplitRange sql is '%s'", querySplitRangeSql));

conn = getConnection();
st = conn.createStatement(resultSetType, resultSetConcurrency);
st.setQueryTimeout(jdbcConf.getQueryTimeOut());
rs = st.executeQuery(querySplitRangeSql);
if (rs.next()) {
splitPkRange =
Pair.of(
String.valueOf(rs.getObject("min_value")),
String.valueOf(rs.getObject("max_value")));
}

LOG.info(
String.format(
"Takes [%s] milliseconds to get the SplitRange value [%s]",
System.currentTimeMillis() - startTime, splitPkRange));
}
return splitPkRange;
} catch (Throwable e) {
throw new ChunJunRuntimeException(
Expand Down Expand Up @@ -707,13 +714,19 @@ protected void executeQuery(String startLocation) throws SQLException {
}
} else {
statement = dbConn.createStatement(resultSetType, resultSetConcurrency);
if (jdbcConf.isDefineColumnTypeForStatement()
&& StringUtils.isBlank(jdbcConf.getCustomSql())) {
defineColumnType(statement);
}
statement.setFetchSize(jdbcConf.getFetchSize());
statement.setQueryTimeout(jdbcConf.getQueryTimeOut());
resultSet = statement.executeQuery(jdbcConf.getQuerySql());
hasNext = resultSet.next();
}
}

protected void defineColumnType(Statement statement) throws SQLException {}

/** init prepareStatement */
public void initPrepareStatement(String querySql) throws SQLException {
ps = dbConn.prepareStatement(querySql, resultSetType, resultSetConcurrency);
Expand Down Expand Up @@ -799,7 +812,8 @@ protected Connection getConnection() throws SQLException {
/** 使用自定义的指标输出器把增量指标打到普罗米修斯 */
@Override
protected boolean useCustomReporter() {
return jdbcConf.isIncrement() && jdbcConf.getInitReporter();
// 配置了 reporter 就可以输入指标到外部系统, 如果不是增量, 增量指标也不会被输出
return jdbcConf.getInitReporter();
}

/** 为了保证增量数据的准确性,指标输出失败时使任务失败 */
Expand Down
26 changes: 26 additions & 0 deletions docs_zh/ChunJun连接器/oracle/oracle-source.md
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,32 @@ Oracle 9 及以上
- 默认值:无
<br />

- **splitPkStart**

- 描述: 当指定了 `splitPk` 之后,可以手动指定 `splitPk` 的上/下界,之后可以直接根据这两个值计算每个并行度的数据量。
- 注意:
- 此参数生效的前提是设置了 `splitPk`,并且需要同时设置 **splitPkEnd** 参数。
- 推荐splitPk使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。
- 目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,ChunJun将报错。
- 如果channel大于1但是没有配置此参数,任务将置为失败。
- 必选:否
- 参数类型:String
- 默认值:无
<br />

- **splitPkEnd**

- 描述: 当指定了 `splitPk` 之后,可以手动指定 `splitPk` 的上/下界,之后可以直接根据这两个值计算每个并行度的数据量。
- 注意:
- 此参数生效的前提是设置了 `splitPk`,并且需要同时设置 **splitPkStart** 参数。
- 推荐splitPk使用表主键,因为表主键通常情况下比较均匀,因此切分出来的分片也不容易出现数据热点。
- 目前splitPk仅支持整形数据切分,不支持浮点、字符串、日期等其他类型。如果用户指定其他非支持类型,ChunJun将报错。
- 如果channel大于1但是没有配置此参数,任务将置为失败。
- 必选:否
- 参数类型:String
- 默认值:无
<br />

- **queryTimeOut**

- 描述:查询超时时间,单位秒。
Expand Down