Skip to content

Commit

Permalink
🎉 8890 Source MySql: Fix large table issue by fetch size (#17236)
Browse files Browse the repository at this point in the history
* 8890 Fix large table issue by fetch size

* 8890 Bump version

* auto-bump connector version [ci skip]

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
suhomud and octavia-squidington-iii authored Oct 7, 2022
1 parent 371da13 commit 3501abe
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -678,7 +678,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 1.0.2
dockerImageTag: 1.0.3
documentationUrl: https://docs.airbyte.io/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6965,7 +6965,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:1.0.2"
- dockerImage: "airbyte/source-mysql:1.0.3"
spec:
documentationUrl: "https://docs.airbyte.io/integrations/sources/mysql"
connectionSpecification:
Expand Down
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.2
LABEL io.airbyte.version=1.0.3

LABEL io.airbyte.name=airbyte/source-mysql
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ dependencies {
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-integrations:connectors:source-relational-db')

implementation 'mysql:mysql-connector-java:8.0.22'
implementation 'mysql:mysql-connector-java:8.0.30'
implementation 'org.apache.commons:commons-lang3:3.11'

testImplementation testFixtures(project(':airbyte-integrations:bases:debezium-v1-9-6'))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import io.airbyte.db.factory.DatabaseDriver;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.base.ssh.SshWrappedSource;
Expand Down Expand Up @@ -72,16 +71,14 @@ public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source
"useSSL=true",
"requireSSL=true");

public static final String SSL_PARAMETERS_WITH_CERTIFICATE_VALIDATION = "verifyServerCertificate=true";
public static final String SSL_PARAMETERS_WITHOUT_CERTIFICATE_VALIDATION = "verifyServerCertificate=false";
private final FeatureFlags featureFlags;

public static Source sshWrappedSource() {
return new SshWrappedSource(new MySqlSource(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}

public MySqlSource() {
super(DRIVER_CLASS, AdaptiveStreamingQueryConfig::new, new MySqlSourceOperations());
super(DRIVER_CLASS, MySqlStreamingQueryConfig::new, new MySqlSourceOperations());
this.featureFlags = new EnvVariableFeatureFlags();
}

Expand Down Expand Up @@ -232,10 +229,10 @@ protected List<AirbyteStateMessage> generateEmptyInitialState(final JsonNode con

@Override
public List<AutoCloseableIterator<AirbyteMessage>> getIncrementalIterators(final JdbcDatabase database,
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<MysqlType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt) {
final ConfiguredAirbyteCatalog catalog,
final Map<String, TableInfo<CommonField<MysqlType>>> tableNameToTable,
final StateManager stateManager,
final Instant emittedAt) {
final JsonNode sourceConfig = database.getSourceConfig();
if (isCdc(sourceConfig) && shouldUseCDC(catalog)) {
final Duration firstRecordWaitTime = CdcConfigurationHelper.getFirstRecordWaitTime(sourceConfig);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import static com.mysql.cj.MysqlType.TINYTEXT;
import static com.mysql.cj.MysqlType.VARCHAR;
import static com.mysql.cj.MysqlType.YEAR;
import static io.airbyte.db.DataTypeUtils.TIME_FORMATTER;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_NAME;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_SIZE;
import static io.airbyte.db.jdbc.JdbcConstants.INTERNAL_COLUMN_TYPE;
Expand All @@ -43,6 +44,7 @@
import com.mysql.cj.MysqlType;
import com.mysql.cj.jdbc.result.ResultSetMetaData;
import com.mysql.cj.result.Field;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.DataTypeUtils;
import io.airbyte.db.SourceOperations;
import io.airbyte.db.jdbc.AbstractJdbcCompatibleSourceOperations;
Expand All @@ -56,14 +58,15 @@
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.format.DateTimeParseException;
import java.util.Collections;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlSourceOperations extends AbstractJdbcCompatibleSourceOperations<MysqlType> implements SourceOperations<ResultSet, MysqlType> {

private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSourceOperations.class);
private static Set<MysqlType> ALLOWED_CURSOR_TYPES = Set.of(TINYINT, TINYINT_UNSIGNED, SMALLINT,
private static final Set<MysqlType> ALLOWED_CURSOR_TYPES = Set.of(TINYINT, TINYINT_UNSIGNED, SMALLINT,
SMALLINT_UNSIGNED, MEDIUMINT, MEDIUMINT_UNSIGNED, INT, INT_UNSIGNED, BIGINT, BIGINT_UNSIGNED,
FLOAT, FLOAT_UNSIGNED, DOUBLE, DOUBLE_UNSIGNED, DECIMAL, DECIMAL_UNSIGNED, DATE, DATETIME, TIMESTAMP,
TIME, YEAR, VARCHAR, TINYTEXT, TEXT, MEDIUMTEXT, LONGTEXT);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/

package io.airbyte.integrations.source.mysql;

import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class MySqlStreamingQueryConfig extends AdaptiveStreamingQueryConfig {

private static final Logger LOGGER = LoggerFactory.getLogger(MySqlStreamingQueryConfig.class);

public MySqlStreamingQueryConfig() {
super();
}

@Override
public void initialize(final Connection connection, final Statement preparedStatement) throws SQLException {
connection.setAutoCommit(false);
preparedStatement.setFetchSize(Integer.MIN_VALUE);
LOGGER.info("Set initial fetch size: {} rows", preparedStatement.getFetchSize());
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ protected void initTests() {
.fullSourceDataType(fullSourceType)
.airbyteType(JsonSchemaType.STRING_TIME_WITHOUT_TIMEZONE)
// JDBC driver can process only "clock"(00:00:00-23:59:59) values.
.addInsertValues("'-22:59:59'", "'23:59:59'", "'00:00:00'")
.addInsertValues("'-22:59:59'","'23:59:59'", "'00:00:00'")
.addExpectedValues("22:59:59.000000", "23:59:59.000000", "00:00:00.000000")
.build());

Expand Down
1 change: 1 addition & 0 deletions docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura
## Changelog
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.3 | 2022-10-07 | [17236](https://github.com/airbytehq/airbyte/pull/17236) | Fix large table issue by fetch size |
| 1.0.2 | 2022-10-03 | [17170](https://github.com/airbytehq/airbyte/pull/17170) | Make initial CDC waiting time configurable |
| 1.0.1 | 2022-10-01 | [17459](https://github.com/airbytehq/airbyte/pull/17459) | Upgrade debezium version to 1.9.6 from 1.9.2 |
| 1.0.0 | 2022-09-27 | [17164](https://github.com/airbytehq/airbyte/pull/17164) | Certify MySQL Source as Beta |
Expand Down

0 comments on commit 3501abe

Please sign in to comment.