Skip to content

Commit 481f63b

Browse files
committed
Fixed #113 - a race condition that could result in duplicate events to be emitted on reconnect
1 parent f608aa8 commit 481f63b

File tree

4 files changed

+196
-56
lines changed

4 files changed

+196
-56
lines changed

Diff for: changelog.md

+5
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,11 @@
22
All notable changes to this project will be documented in this file.
33
This project adheres to [Semantic Versioning](http://semver.org/).
44

5+
## [0.4.2](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.1...0.4.2) - 2016-09-19
6+
7+
### Fixed
8+
- A race condition that could result in duplicate events to be emitted on reconnect ([#113](https://github.com/shyiko/mysql-binlog-connector-java/issues/113)).
9+
510
## [0.4.1](https://github.com/shyiko/mysql-binlog-connector-java/compare/0.4.0...0.4.1) - 2016-08-31
611

712
### Fixed

Diff for: src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java

+80-55
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public X509Certificate[] getAcceptedIssuers() {
140140
private SocketFactory socketFactory;
141141
private SSLSocketFactory sslSocketFactory;
142142

143-
private PacketChannel channel;
143+
private volatile PacketChannel channel;
144144
private volatile boolean connected;
145145

146146
private ThreadFactory threadFactory;
@@ -152,7 +152,10 @@ public X509Certificate[] getAcceptedIssuers() {
152152
private volatile ExecutorService keepAliveThreadExecutor;
153153
private long keepAliveThreadShutdownTimeout = TimeUnit.SECONDS.toMillis(6);
154154

155-
private final Lock shutdownLock = new ReentrantLock();
155+
private final Lock connectLock = new ReentrantLock();
156+
private final Lock disconnectLock = new ReentrantLock();
157+
// used to prevent channel reinitialization after it was closed in #disconnectChannel().
158+
private volatile boolean awaitingConnectTermination;
156159

157160
/**
158161
* Alias for BinaryLogClient("localhost", 3306, <no schema> = null, username, password).
@@ -397,69 +400,80 @@ public void setThreadFactory(ThreadFactory threadFactory) {
397400
* @throws IOException if anything goes wrong while trying to connect
398401
*/
399402
public void connect() throws IOException {
400-
if (connected) {
403+
if (!connectLock.tryLock()) {
401404
throw new IllegalStateException("BinaryLogClient is already connected");
402405
}
403-
GreetingPacket greetingPacket;
404406
try {
405407
try {
406-
Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket();
407-
socket.connect(new InetSocketAddress(hostname, port));
408-
channel = new PacketChannel(socket);
409-
if (channel.getInputStream().peek() == -1) {
410-
throw new EOFException();
408+
channel = establishConnection();
409+
if (awaitingConnectTermination) {
410+
throw new IOException();
411411
}
412+
GreetingPacket greetingPacket = receiveGreeting();
413+
authenticate(greetingPacket);
414+
connectionId = greetingPacket.getThreadId();
415+
if (binlogFilename == null) {
416+
fetchBinlogFilenameAndPosition();
417+
}
418+
if (binlogPosition < 4) {
419+
if (logger.isLoggable(Level.WARNING)) {
420+
logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4);
421+
}
422+
binlogPosition = 4;
423+
}
424+
ChecksumType checksumType = fetchBinlogChecksum();
425+
if (checksumType != ChecksumType.NONE) {
426+
confirmSupportOfChecksum(checksumType);
427+
}
428+
requestBinaryLogStream();
412429
} catch (IOException e) {
413-
throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
414-
". Please make sure it's running.", e);
415-
}
416-
greetingPacket = receiveGreeting();
417-
authenticate(greetingPacket);
418-
if (binlogFilename == null) {
419-
fetchBinlogFilenameAndPosition();
430+
if (channel != null && channel.isOpen()) {
431+
channel.close();
432+
}
433+
throw e;
420434
}
421-
if (binlogPosition < 4) {
422-
if (logger.isLoggable(Level.WARNING)) {
423-
logger.warning("Binary log position adjusted from " + binlogPosition + " to " + 4);
435+
connected = true;
436+
if (logger.isLoggable(Level.INFO)) {
437+
String position;
438+
synchronized (gtidSetAccessLock) {
439+
position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition;
424440
}
425-
binlogPosition = 4;
441+
logger.info("Connected to " + hostname + ":" + port + " at " + position +
442+
" (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")");
426443
}
427-
ChecksumType checksumType = fetchBinlogChecksum();
428-
if (checksumType != ChecksumType.NONE) {
429-
confirmSupportOfChecksum(checksumType);
444+
synchronized (lifecycleListeners) {
445+
for (LifecycleListener lifecycleListener : lifecycleListeners) {
446+
lifecycleListener.onConnect(this);
447+
}
430448
}
431-
requestBinaryLogStream();
432-
} catch (IOException e) {
433-
if (channel != null && channel.isOpen()) {
434-
channel.close();
449+
if (keepAlive && !isKeepAliveThreadRunning()) {
450+
spawnKeepAliveThread();
435451
}
436-
throw e;
437-
}
438-
connected = true;
439-
connectionId = greetingPacket.getThreadId();
440-
if (logger.isLoggable(Level.INFO)) {
441-
String position;
452+
ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
442453
synchronized (gtidSetAccessLock) {
443-
position = gtidSet != null ? gtidSet.toString() : binlogFilename + "/" + binlogPosition;
444-
}
445-
logger.info("Connected to " + hostname + ":" + port + " at " + position +
446-
" (" + (blocking ? "sid:" + serverId + ", " : "") + "cid:" + connectionId + ")");
447-
}
448-
synchronized (lifecycleListeners) {
449-
for (LifecycleListener lifecycleListener : lifecycleListeners) {
450-
lifecycleListener.onConnect(this);
454+
if (gtidSet != null) {
455+
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
456+
}
451457
}
458+
listenForEventPackets();
459+
} finally {
460+
connectLock.unlock();
452461
}
453-
if (keepAlive && !isKeepAliveThreadRunning()) {
454-
spawnKeepAliveThread();
455-
}
456-
ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
457-
synchronized (gtidSetAccessLock) {
458-
if (gtidSet != null) {
459-
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
462+
}
463+
464+
private PacketChannel establishConnection() throws IOException {
465+
try {
466+
Socket socket = socketFactory != null ? socketFactory.createSocket() : new Socket();
467+
socket.connect(new InetSocketAddress(hostname, port));
468+
PacketChannel channel = new PacketChannel(socket);
469+
if (channel.getInputStream().peek() == -1) {
470+
throw new EOFException();
460471
}
472+
return channel;
473+
} catch (IOException e) {
474+
throw new IOException("Failed to connect to MySQL on " + hostname + ":" + port +
475+
". Please make sure it's running.", e);
461476
}
462-
listenForEventPackets();
463477
}
464478

465479
private GreetingPacket receiveGreeting() throws IOException {
@@ -556,7 +570,7 @@ public void run() {
556570
} catch (InterruptedException e) {
557571
// expected in case of disconnect
558572
}
559-
shutdownLock.lock();
573+
disconnectLock.lock();
560574
try {
561575
if (keepAliveThreadExecutor.isShutdown()) {
562576
return;
@@ -580,7 +594,7 @@ public void run() {
580594
}
581595
}
582596
} finally {
583-
shutdownLock.unlock();
597+
disconnectLock.unlock();
584598
}
585599
}
586600
}
@@ -895,7 +909,7 @@ public void registerLifecycleListener(LifecycleListener lifecycleListener) {
895909
/**
896910
* Unregister all lifecycle listener of specific type.
897911
*/
898-
public synchronized void unregisterLifecycleListener(Class<? extends LifecycleListener> listenerClass) {
912+
public void unregisterLifecycleListener(Class<? extends LifecycleListener> listenerClass) {
899913
synchronized (lifecycleListeners) {
900914
Iterator<LifecycleListener> iterator = lifecycleListeners.iterator();
901915
while (iterator.hasNext()) {
@@ -910,7 +924,7 @@ public synchronized void unregisterLifecycleListener(Class<? extends LifecycleLi
910924
/**
911925
* Unregister single lifecycle listener.
912926
*/
913-
public synchronized void unregisterLifecycleListener(LifecycleListener eventListener) {
927+
public void unregisterLifecycleListener(LifecycleListener eventListener) {
914928
synchronized (lifecycleListeners) {
915929
lifecycleListeners.remove(eventListener);
916930
}
@@ -922,14 +936,14 @@ public synchronized void unregisterLifecycleListener(LifecycleListener eventList
922936
* As the result following {@link #connect()} resumes client from where it left off.
923937
*/
924938
public void disconnect() throws IOException {
925-
shutdownLock.lock();
939+
disconnectLock.lock();
926940
try {
927941
if (isKeepAliveThreadRunning()) {
928942
keepAliveThreadExecutor.shutdownNow();
929943
}
930944
disconnectChannel();
931945
} finally {
932-
shutdownLock.unlock();
946+
disconnectLock.unlock();
933947
}
934948
if (isKeepAliveThreadRunning()) {
935949
waitForKeepAliveThreadToBeTerminated();
@@ -959,6 +973,7 @@ private void disconnectChannel() throws IOException {
959973
channel.close();
960974
}
961975
} finally {
976+
waitForConnectToTerminate();
962977
synchronized (lifecycleListeners) {
963978
for (LifecycleListener lifecycleListener : lifecycleListeners) {
964979
lifecycleListener.onDisconnect(this);
@@ -967,6 +982,16 @@ private void disconnectChannel() throws IOException {
967982
}
968983
}
969984

985+
private void waitForConnectToTerminate() {
986+
awaitingConnectTermination = true;
987+
try {
988+
connectLock.lock();
989+
connectLock.unlock();
990+
} finally {
991+
awaitingConnectTermination = false;
992+
}
993+
}
994+
970995
/**
971996
* {@link BinaryLogClient}'s event listener.
972997
*/

Diff for: src/test/java/com/github/shyiko/mysql/binlog/BinaryLogClientIntegrationTest.java

+110
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,11 @@
2929
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
3030
import com.github.shyiko.mysql.binlog.event.deserialization.EventHeaderV4Deserializer;
3131
import com.github.shyiko.mysql.binlog.event.deserialization.QueryEventDataDeserializer;
32+
import com.github.shyiko.mysql.binlog.io.BufferedSocketInputStream;
3233
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
3334
import com.github.shyiko.mysql.binlog.network.AuthenticationException;
3435
import com.github.shyiko.mysql.binlog.network.ServerException;
36+
import com.github.shyiko.mysql.binlog.network.SocketFactory;
3537
import org.mockito.InOrder;
3638
import org.testng.SkipException;
3739
import org.testng.annotations.AfterClass;
@@ -42,10 +44,15 @@
4244

4345
import java.io.Closeable;
4446
import java.io.EOFException;
47+
import java.io.FilterInputStream;
48+
import java.io.FilterOutputStream;
4549
import java.io.IOException;
50+
import java.io.InputStream;
51+
import java.io.OutputStream;
4652
import java.io.Serializable;
4753
import java.math.BigDecimal;
4854
import java.math.MathContext;
55+
import java.net.Socket;
4956
import java.net.SocketException;
5057
import java.sql.Connection;
5158
import java.sql.DriverManager;
@@ -62,7 +69,10 @@
6269
import java.util.concurrent.CountDownLatch;
6370
import java.util.concurrent.TimeUnit;
6471
import java.util.concurrent.TimeoutException;
72+
import java.util.concurrent.atomic.AtomicBoolean;
6573
import java.util.concurrent.atomic.AtomicReference;
74+
import java.util.concurrent.locks.Lock;
75+
import java.util.concurrent.locks.ReentrantLock;
6676
import java.util.logging.Level;
6777
import java.util.logging.Logger;
6878

@@ -754,6 +764,106 @@ public void execute(Statement statement) throws SQLException {
754764
}
755765
}
756766

767+
@Test
768+
public void testReconnectRaceCondition() throws Exception {
769+
// this test relies on SO_RCVBUF (sysctl -a | grep rcvbuf)
770+
// a more reliable way would be to use buffered 2-level concurrent filter input stream
771+
try {
772+
client.disconnect();
773+
final BinaryLogClient binaryLogClient =
774+
new BinaryLogClient(slave.hostname, slave.port, slave.username, slave.password);
775+
final Lock inputStreamLock = new ReentrantLock();
776+
final AtomicBoolean breakOutputStream = new AtomicBoolean();
777+
binaryLogClient.setSocketFactory(new SocketFactory() {
778+
779+
@Override
780+
public Socket createSocket() throws SocketException {
781+
return new Socket() {
782+
783+
@Override
784+
public InputStream getInputStream() throws IOException {
785+
return new FilterInputStream(new BufferedSocketInputStream(super.getInputStream())) {
786+
787+
@Override
788+
public int read(byte[] b, int off, int len) throws IOException {
789+
int read = super.read(b, off, len);
790+
inputStreamLock.lock();
791+
inputStreamLock.unlock();
792+
return read;
793+
}
794+
};
795+
}
796+
797+
@Override
798+
public OutputStream getOutputStream() throws IOException {
799+
return new FilterOutputStream(super.getOutputStream()) {
800+
801+
@Override
802+
public void write(int b) throws IOException {
803+
if (breakOutputStream.get()) {
804+
binaryLogClient.setSocketFactory(null);
805+
throw new IOException();
806+
}
807+
super.write(b);
808+
}
809+
};
810+
}
811+
};
812+
}
813+
});
814+
binaryLogClient.registerEventListener(eventListener);
815+
binaryLogClient.setKeepAliveInterval(TimeUnit.MILLISECONDS.toMillis(100));
816+
binaryLogClient.connect(DEFAULT_TIMEOUT);
817+
try {
818+
eventListener.waitFor(EventType.FORMAT_DESCRIPTION, 1, DEFAULT_TIMEOUT);
819+
master.execute(new Callback<Statement>() {
820+
@Override
821+
public void execute(Statement statement) throws SQLException {
822+
statement.execute("insert into bikini_bottom values('SpongeBob')");
823+
}
824+
});
825+
eventListener.waitFor(WriteRowsEventData.class, 1, DEFAULT_TIMEOUT);
826+
// lock input stream
827+
inputStreamLock.lock();
828+
// fill input stream buffer
829+
master.execute(new Callback<Statement>() {
830+
@Override
831+
public void execute(Statement statement) throws SQLException {
832+
statement.execute("insert into bikini_bottom values('Patrick')");
833+
statement.execute("insert into bikini_bottom values('Rocky')");
834+
}
835+
});
836+
// trigger reconnect
837+
final CountDownLatch reconnect = new CountDownLatch(1);
838+
binaryLogClient.registerLifecycleListener(new BinaryLogClient.AbstractLifecycleListener() {
839+
840+
@Override
841+
public void onConnect(BinaryLogClient client) {
842+
reconnect.countDown();
843+
}
844+
});
845+
breakOutputStream.set(true);
846+
// wait for connection to be reestablished
847+
reconnect.await(DEFAULT_TIMEOUT, TimeUnit.MILLISECONDS);
848+
// unlock input stream (from previous connection)
849+
inputStreamLock.unlock();
850+
master.execute(new Callback<Statement>() {
851+
@Override
852+
public void execute(Statement statement) throws SQLException {
853+
statement.execute("delete from bikini_bottom where name = 'Patrick'");
854+
}
855+
});
856+
eventListener.waitFor(DeleteRowsEventData.class, 1, DEFAULT_TIMEOUT);
857+
// assert that no events were delivered twice
858+
eventListener.waitFor(WriteRowsEventData.class, 2, DEFAULT_TIMEOUT);
859+
} finally {
860+
binaryLogClient.disconnect();
861+
}
862+
} finally {
863+
client.connect(DEFAULT_TIMEOUT);
864+
}
865+
}
866+
757867
@AfterMethod
758868
public void afterEachTest() throws Exception {
759869
final CountDownLatch latch = new CountDownLatch(1);

Diff for: supplement/codequality/checkstyle.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
<property name="max" value="120"/>
7373
</module>
7474
<module name="MethodLength">
75-
<property name="max" value="65"/>
75+
<property name="max" value="100"/>
7676
</module>
7777
<module name="ParameterNumber">
7878
<property name="max" value="5"/>

0 commit comments

Comments
 (0)