Skip to content

Commit

Permalink
8890 Return useCursorFetch
Browse files Browse the repository at this point in the history
  • Loading branch information
suhomud committed Oct 6, 2022
1 parent ca12f87 commit 8257491
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,12 @@ public JsonNode toDatabaseConfig(final JsonNode config) {
config.get(JdbcUtils.PORT_KEY).asText(),
config.get(JdbcUtils.DATABASE_KEY).asText()));

jdbcUrl.append("?zeroDateTimeBehavior=convertToNull");
// To fetch the result in batches, the "useCursorFetch=true" must be set.
// https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-implementation-notes.html.
// When using this approach MySql creates a temporary table which may have some effect on db
// performance.
jdbcUrl.append("?useCursorFetch=true");
jdbcUrl.append("&zeroDateTimeBehavior=convertToNull");
// ensure the return tinyint(1) is boolean
jdbcUrl.append("&tinyInt1isBit=true");
// ensure the return year value is a Date; see the rationale
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,33 +71,6 @@ public class MySqlSourceOperations extends AbstractJdbcCompatibleSourceOperation
FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED, DECIMAL, DECIMAL_UNSIGNED, DATE, DATETIME, TIMESTAMP,
TIME, YEAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT);

@Override
public JsonNode rowToJson(final ResultSet queryContext) throws SQLException {
// the first call communicates with the database. after that the result is cached.
final java.sql.ResultSetMetaData metadata = queryContext.getMetaData();
final int columnCount = metadata.getColumnCount();
final ObjectNode jsonNode = (ObjectNode) Jsons.jsonNode(Collections.emptyMap());

for (int i = 1; i <= columnCount; i++) {
final String columnType = metadata.getColumnTypeName(i);

if (columnType.equalsIgnoreCase(TIME.getName())) {
// getObject will fail when it tries to parse time with negative value
queryContext.getString(i);
} else {
queryContext.getObject(i);
}
if (queryContext.wasNull()) {
continue;
}

// convert to java types that will convert into reasonable json.
setJsonField(queryContext, i, jsonNode);
}

return jsonNode;
}

/**
* @param colIndex 1-based column index.
*/
Expand Down Expand Up @@ -248,7 +221,7 @@ protected void putDate(final ObjectNode node, final String columnName, final Res

@Override
protected void putTime(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, convertToTime(resultSet.getString(index)));
node.put(columnName, DateTimeConverter.convertToTime(getObject(resultSet, index, LocalTime.class)));
}

@Override
Expand Down Expand Up @@ -323,9 +296,4 @@ protected void setTime(final PreparedStatement preparedStatement, final int para
}
}

private String convertToTime(String time) {
time = time.startsWith("-") ? time.substring(1) : time;
return LocalTime.parse(time).format(TIME_FORMATTER);
}

}

0 comments on commit 8257491

Please sign in to comment.