Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

don't commit GTID to GtidSet until we hit COMMIT #250

Merged
merged 7 commits into from
Dec 10, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 58 additions & 11 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,14 @@
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.GtidEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
Expand Down Expand Up @@ -133,6 +135,8 @@ public X509Certificate[] getAcceptedIssuers() {
private GtidSet gtidSet;
private final Object gtidSetAccessLock = new Object();
private boolean gtidSetFallbackToPurged;
private String currentGtid;
private boolean inGTIDTransaction;

private EventDeserializer eventDeserializer = new EventDeserializer();

Expand Down Expand Up @@ -552,6 +556,7 @@ public void connect() throws IOException {
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
ensureEventDataDeserializer(EventType.QUERY, QueryEventDataDeserializer.class);
}
}
listenForEventPackets();
Expand Down Expand Up @@ -635,6 +640,8 @@ private void enableHeartbeat() throws IOException {
private void requestBinaryLogStream() throws IOException {
long serverId = blocking ? this.serverId : 0; // http://bugs.mysql.com/bug.php?id=71178
Command dumpBinaryLogCommand;
currentGtid = null;
inGTIDTransaction = false;
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId, binlogFilename, binlogPosition, gtidSet);
Expand Down Expand Up @@ -962,21 +969,61 @@ private void updateClientBinlogFilenameAndPosition(Event event) {
}
}

private EventData unwrapEventData(EventData eventData) {
if (eventData instanceof EventDeserializer.EventDataWrapper) {
return ((EventDeserializer.EventDataWrapper) eventData).getInternal();
} else {
return eventData;
}
}

private void addGtidToSet(String gtid) {
if (gtid == null) {
return;
}

synchronized (gtidSetAccessLock) {
gtidSet.add(currentGtid);
}
}

private void updateGtidSet(Event event) {
synchronized (gtidSetAccessLock) {
if (gtidSet == null) {
return;
}
}

EventHeader eventHeader = event.getHeader();
if (eventHeader.getEventType() == EventType.GTID) {
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
EventData eventData = event.getData();
GtidEventData gtidEventData;
if (eventData instanceof EventDeserializer.EventDataWrapper) {
gtidEventData = (GtidEventData) ((EventDeserializer.EventDataWrapper) eventData).getInternal();
} else {
gtidEventData = (GtidEventData) eventData;
}
gtidSet.add(gtidEventData.getGtid());
GtidEventData gtidEventData = (GtidEventData) unwrapEventData(event.getData());
currentGtid = gtidEventData.getGtid();
return;
}

switch(eventHeader.getEventType()) {
case XID:
addGtidToSet(currentGtid);
inGTIDTransaction = false;
break;
case QUERY:
QueryEventData qed = (QueryEventData) unwrapEventData(event.getData());
String sql = qed.getSql();
if (sql == null) {
break;
}
}

sql = sql.toUpperCase();
if (sql.startsWith("BEGIN")) {
inGTIDTransaction = true;
} else if (sql.startsWith("COMMIT")) {
addGtidToSet(currentGtid);
inGTIDTransaction = false;
} else if (!inGTIDTransaction) {
//auto-commit query, likely DDL
addGtidToSet(currentGtid);
}
default:
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,161 @@
/*
* Copyright 2013 Stanley Shyiko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.shyiko.mysql.binlog;

import com.github.shyiko.mysql.binlog.event.XidEventData;
import com.github.shyiko.mysql.binlog.event.QueryEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;

import static org.testng.Assert.assertNotEquals;
import static org.testng.AssertJUnit.assertNotNull;

/**
* MySQL replication stream client.
*
* @author <a href="mailto:stanley.shyiko@gmail.com">Stanley Shyiko</a>
*/
public class BinaryLogClientGTIDIntegrationTest extends BinaryLogClientIntegrationTest {
private final Logger logger = Logger.getLogger(getClass().getSimpleName());

@BeforeClass
private void enableGTID() throws SQLException {
MySQLConnection[] servers = {slave, master};
for (MySQLConnection m : servers) {
m.execute(new Callback<Statement>() {
@Override
public void execute(Statement statement) throws SQLException {
ResultSet rs = statement.executeQuery("select @@GLOBAL.GTID_MODE as gtid_mode");
rs.next();
if ("ON".equals(rs.getString("gtid_mode"))) {
return;
}

statement.execute("SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = ON;");
statement.execute("SET @@GLOBAL.GTID_MODE = OFF_PERMISSIVE;");
statement.execute("SET @@GLOBAL.GTID_MODE = ON_PERMISSIVE;");
statement.execute("SET @@GLOBAL.GTID_MODE = ON;");
}
}, true);
}
}

@AfterClass
private void disableGTID() throws SQLException {
MySQLConnection[] servers = {slave, master};
for (MySQLConnection m : servers) {
m.execute(new Callback<Statement>() {
@Override
public void execute(Statement statement) throws SQLException {
statement.execute("SET @@GLOBAL.GTID_MODE = ON_PERMISSIVE;");
statement.execute("SET @@GLOBAL.GTID_MODE = OFF_PERMISSIVE;");
statement.execute("SET @@GLOBAL.GTID_MODE = OFF;");
statement.execute("SET @@GLOBAL.ENFORCE_GTID_CONSISTENCY = OFF;");
}
}, true);
}
slave.execute("STOP SLAVE", "START SLAVE");
}

@Test
public void testGTIDAdvancesStatementBased() throws Exception {
try {
master.execute("set global binlog_format=statement");
slave.execute("set global binlog_format=statement", "stop slave", "start slave");
master.reconnect();
master.execute("use test");
testGTIDAdvances();
} finally {
master.execute("set global binlog_format=row");
slave.execute("set global binlog_format=row", "stop slave", "start slave");
master.reconnect();
master.execute("use test");
}
}

@Test
public void testGTIDAdvances() throws Exception {
master.execute("CREATE TABLE if not exists foo (i int)");

final String[] initialGTIDSet = new String[1];
master.query("show master status", new Callback<ResultSet>() {
@Override
public void execute(ResultSet rs) throws SQLException {
rs.next();
initialGTIDSet[0] = rs.getString("Executed_Gtid_Set");
}
});

EventDeserializer eventDeserializer = new EventDeserializer();
try {
client.disconnect();
final BinaryLogClient clientWithKeepAlive = new BinaryLogClient(slave.hostname(), slave.port(),
slave.username(), slave.password());

clientWithKeepAlive.setGtidSet(initialGTIDSet[0]);
clientWithKeepAlive.registerEventListener(eventListener);
clientWithKeepAlive.setEventDeserializer(eventDeserializer);
try {
eventListener.reset();
clientWithKeepAlive.connect(DEFAULT_TIMEOUT);

master.execute(new Callback<Statement>() {
@Override
public void execute(Statement statement) throws SQLException {
statement.execute("INSERT INTO foo set i = 2");
statement.execute("INSERT INTO foo set i = 3");
}
});

eventListener.waitFor(XidEventData.class, 1, TimeUnit.SECONDS.toMillis(4));
String gtidSet = clientWithKeepAlive.getGtidSet();
assertNotNull(gtidSet);

eventListener.reset();

master.execute(new Callback<Statement>() {
@Override
public void execute(Statement statement) throws SQLException {
statement.execute("INSERT INTO foo set i = 4");
statement.execute("INSERT INTO foo set i = 5");
}
});

eventListener.waitFor(XidEventData.class, 1, TimeUnit.SECONDS.toMillis(4));
assertNotEquals(client.getGtidSet(), gtidSet);

gtidSet = client.getGtidSet();

eventListener.reset();
master.execute("DROP TABLE IF EXISTS test.bar");
eventListener.waitFor(QueryEventData.class, 1, TimeUnit.SECONDS.toMillis(4));
assertNotEquals(clientWithKeepAlive.getGtidSet(), gtidSet);
} finally {
clientWithKeepAlive.disconnect();
}
} finally {
client.connect(DEFAULT_TIMEOUT);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
*/
public class BinaryLogClientIntegrationTest {

private static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(3);
protected static final long DEFAULT_TIMEOUT = TimeUnit.SECONDS.toMillis(3);

private final Logger logger = Logger.getLogger(getClass().getSimpleName());

Expand All @@ -107,9 +107,9 @@ public class BinaryLogClientIntegrationTest {

private final TimeZone timeZoneBeforeTheTest = TimeZone.getDefault();

private MySQLConnection master, slave;
private BinaryLogClient client;
private CountDownEventListener eventListener;
protected MySQLConnection master, slave;
protected BinaryLogClient client;
protected CountDownEventListener eventListener;

@BeforeClass
public void setUp() throws Exception {
Expand Down Expand Up @@ -728,24 +728,11 @@ public EventHeaderV4 deserialize(ByteArrayInputStream inputStream) throws IOExce

private void testCommunicationFailureInTheMiddleOfEventDataDeserialization(final IOException ex) throws Exception {
EventDeserializer eventDeserializer = new EventDeserializer();
eventDeserializer.setEventDataDeserializer(EventType.QUERY, new QueryEventDataDeserializer() {

private boolean failureSimulated;

@Override
public QueryEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
QueryEventData eventData = super.deserialize(inputStream);
if (!failureSimulated) {
failureSimulated = true;
throw new SocketException();
}
return eventData;
}
});
eventDeserializer.setEventDataDeserializer(EventType.QUERY, new QueryEventFailureSimulator());
testCommunicationFailure(eventDeserializer);
}

private void testCommunicationFailure(EventDeserializer eventDeserializer) throws Exception {
protected void testCommunicationFailure(EventDeserializer eventDeserializer) throws Exception {
try {
client.disconnect();
final BinaryLogClient clientWithKeepAlive = new BinaryLogClient(slave.hostname, slave.port,
Expand Down Expand Up @@ -1096,17 +1083,23 @@ public String password() {
return password;
}

public void execute(Callback<Statement> callback) throws SQLException {
connection.setAutoCommit(false);
public void execute(Callback<Statement> callback, boolean autocommit) throws SQLException {
connection.setAutoCommit(autocommit);
Statement statement = connection.createStatement();
try {
callback.execute(statement);
connection.commit();
if (!autocommit) {
connection.commit();
}
} finally {
statement.close();
}
}

public void execute(Callback<Statement> callback) throws SQLException {
execute(callback, false);
}

public void execute(final String...statements) throws SQLException {
execute(new Callback<Statement>() {
@Override
Expand Down Expand Up @@ -1142,6 +1135,12 @@ public void close() throws IOException {
throw new IOException(e);
}
}

public void reconnect() throws IOException, SQLException {
close();

this.connection = DriverManager.getConnection("jdbc:mysql://" + hostname + ":" + port, username, password);
}
}

/**
Expand All @@ -1153,4 +1152,22 @@ public interface Callback<T> {

void execute(T obj) throws SQLException;
}

/**
* @author <a href="mailto:stanley.shyiko@gmail.com">Stanley Shyiko</a>
*/
protected class QueryEventFailureSimulator extends QueryEventDataDeserializer {
private boolean failureSimulated;

@Override
public QueryEventData deserialize(ByteArrayInputStream inputStream) throws IOException {
QueryEventData eventData = super.deserialize(inputStream);
if (!failureSimulated) {
failureSimulated = true;
throw new SocketException();
}
return eventData;
}
}

}