Skip to content

Commit

Permalink
MySQL Source : Check for REQUIRE CLIENT privilege in CDC (#20000)
Browse files Browse the repository at this point in the history
* Test

* Check for REPLICATION CLIENT PRIVILEGE

* format

* undo format

* Fix test + comments

* Simplify checked function

* Bump up version + log

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
  • Loading branch information
akashkulk and octavia-squidington-iii authored Dec 6, 2022
1 parent b5fdbaf commit 28e2059
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -997,7 +997,7 @@
- name: MySQL
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerRepository: airbyte/source-mysql
dockerImageTag: 1.0.14
dockerImageTag: 1.0.15
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
icon: mysql.svg
sourceType: database
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8790,7 +8790,7 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-mysql:1.0.14"
- dockerImage: "airbyte/source-mysql:1.0.15"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/mysql"
connectionSpecification:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql-strict-encrypt

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.14
LABEL io.airbyte.version=1.0.15

LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
2 changes: 1 addition & 1 deletion airbyte-integrations/connectors/source-mysql/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ ENV APPLICATION source-mysql

COPY --from=build /airbyte /airbyte

LABEL io.airbyte.version=1.0.14
LABEL io.airbyte.version=1.0.15

LABEL io.airbyte.name=airbyte/source-mysql
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
package io.airbyte.integrations.source.mysql.helpers;

import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.exceptions.ConfigErrorException;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.db.jdbc.JdbcDatabase;
import java.sql.SQLException;
import java.time.ZoneId;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -38,12 +40,28 @@ public class CdcConfigurationHelper {
* @return list of List<CheckedConsumer<JdbcDatabase, Exception>>
*/
public static List<CheckedConsumer<JdbcDatabase, Exception>> getCheckOperations() {
return List.of(getCheckOperation(LOG_BIN, "ON"),
return List.of(getMasterStatusOperation(),
getCheckOperation(LOG_BIN, "ON"),
getCheckOperation(BINLOG_FORMAT, "ROW"),
getCheckOperation(BINLOG_ROW_IMAGE, "FULL"));

}

// Checks whether the user has REPLICATION CLIENT privilege needed to query status information about
// the binary log files, which are needed for CDC.
private static CheckedConsumer<JdbcDatabase, Exception> getMasterStatusOperation() {
return database -> {
try {
database.unsafeResultSetQuery(
connection -> connection.createStatement().executeQuery("SHOW MASTER STATUS"),
resultSet -> resultSet);
} catch (final SQLException e) {
throw new ConfigErrorException("Please grant REPLICATION CLIENT privilege, so that binary log files are available"
+ " for CDC mode.");
}
};
}

private static CheckedConsumer<JdbcDatabase, Exception> getCheckOperation(final String name, final String value) {
return database -> {
final List<String> result = database.queryStrings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.debezium.CdcSourceTest;
import io.airbyte.integrations.debezium.CdcTargetPosition;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
Expand Down Expand Up @@ -106,6 +108,10 @@ private void revokeAllPermissions() {
executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';");
}

private void revokeReplicationClientPermission() {
executeQuery("REVOKE REPLICATION CLIENT ON *.* FROM " + container.getUsername() + "@'%';");
}

private void grantCorrectPermissions() {
executeQuery("GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO " + container.getUsername() + "@'%';");
}
Expand Down Expand Up @@ -213,6 +219,16 @@ protected String randomTableSchema() {
return MODELS_SCHEMA;
}

@Test
protected void syncWithReplicationClientPrivilegeRevokedFailsCheck() throws Exception {
revokeReplicationClientPermission();
final AirbyteConnectionStatus status = getSource().check(getConfig());
final String expectedErrorMessage = "Please grant REPLICATION CLIENT privilege, so that binary log files are available"
+ " for CDC mode.";
assertTrue(status.getStatus().equals(Status.FAILED));
assertTrue(status.getMessage().contains(expectedErrorMessage));
}

@Test
protected void syncShouldHandlePurgedLogsGracefully() throws Exception {

Expand Down
3 changes: 2 additions & 1 deletion docs/integrations/sources/mysql.md
Original file line number Diff line number Diff line change
Expand Up @@ -255,7 +255,8 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura
## Changelog

| Version | Date | Pull Request | Subject |
| :------ | :--------- | :--------------------------------------------------------- | :----------------------------------------------------------------------------------------------------------------------------------------------- |
|:--------|:-----------| :--------------------------------------------------------- |:-------------------------------------------------------------------------------------------------------------------------------------------------|
| 1.0.15 | 2022-12-06 | [20000](https://github.com/airbytehq/airbyte/pull/20000) | Add check and better messaging when user does not have permission to access binary log in CDC mode |
| 1.0.14 | 2022-11-22 | [19514](https://github.com/airbytehq/airbyte/pull/19514) | Adjust batch selection memory limits databases. |
| 1.0.13 | 2022-11-14 | [18956](https://github.com/airbytehq/airbyte/pull/18956) | Clean up Tinyint Unsigned data type identification |
| 1.0.12 | 2022-11-07 | [19025](https://github.com/airbytehq/airbyte/pull/19025) | Stop enforce SSL if ssl mode is disabled |
Expand Down

0 comments on commit 28e2059

Please sign in to comment.