Skip to content

Commit

Permalink
Added support for COM_BINLOG_DUMP_GTID (resolves #41)
Browse files Browse the repository at this point in the history
  • Loading branch information
shyiko committed Apr 30, 2015
1 parent 633d5d2 commit 3f30768
Show file tree
Hide file tree
Showing 4 changed files with 434 additions and 6 deletions.
94 changes: 88 additions & 6 deletions src/main/java/com/github/shyiko/mysql/binlog/BinaryLogClient.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import com.github.shyiko.mysql.binlog.event.EventHeader;
import com.github.shyiko.mysql.binlog.event.EventHeaderV4;
import com.github.shyiko.mysql.binlog.event.EventType;
import com.github.shyiko.mysql.binlog.event.GtidEventData;
import com.github.shyiko.mysql.binlog.event.RotateEventData;
import com.github.shyiko.mysql.binlog.event.deserialization.ChecksumType;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializationException;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.EventDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.GtidEventDataDeserializer;
import com.github.shyiko.mysql.binlog.event.deserialization.RotateEventDataDeserializer;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;
import com.github.shyiko.mysql.binlog.jmx.BinaryLogClientMXBean;
Expand All @@ -35,7 +37,9 @@
import com.github.shyiko.mysql.binlog.network.protocol.PacketChannel;
import com.github.shyiko.mysql.binlog.network.protocol.ResultSetRowPacket;
import com.github.shyiko.mysql.binlog.network.protocol.command.AuthenticateCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.Command;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.DumpBinaryLogGtidCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.PingCommand;
import com.github.shyiko.mysql.binlog.network.protocol.command.QueryCommand;

Expand Down Expand Up @@ -80,6 +84,9 @@ public class BinaryLogClient implements BinaryLogClientMXBean {
private volatile String binlogFilename;
private volatile long binlogPosition = 4;

private GtidSet gtidSet;
private final Object gtidSetAccessLock = new Object();

private EventDeserializer eventDeserializer = new EventDeserializer();

private final List<EventListener> eventListeners = new LinkedList<EventListener>();
Expand Down Expand Up @@ -200,6 +207,34 @@ public void setBinlogPosition(long binlogPosition) {
this.binlogPosition = binlogPosition;
}

/**
* @return GTID set. Note that this value changes with each received GTID event (provided client is in GTID mode).
* @see #setGtidSet(String)
*/
public String getGtidSet() {
synchronized (gtidSetAccessLock) {
return gtidSet != null ? gtidSet.toString() : null;
}
}

/**
* @param gtidSet GTID set (can be an empty string).
* <p>NOTE #1: Any value but null will switch BinaryLogClient into a GTID mode (in which case GTID set will be
* updated with each incoming GTID event) as well as set binlogFilename to "" (empty string) (meaning
* BinaryLogClient will request events "outside of the set" <u>starting from the oldest known binlog</u>).
* <p>NOTE #2: {@link #setBinlogFilename(String)} and {@link #setBinlogPosition(long)} can be used to specify the
* exact position from which MySQL server should start streaming events (taking into account GTID set).
* @see #getGtidSet()
*/
public void setGtidSet(String gtidSet) {
if (gtidSet != null && this.binlogFilename == null) {
this.binlogFilename = "";
}
synchronized (gtidSetAccessLock) {
this.gtidSet = gtidSet != null ? new GtidSet(gtidSet) : null;
}
}

/**
* @return true if "keep alive" thread should be automatically started (default), false otherwise.
* @see #setKeepAlive(boolean)
Expand Down Expand Up @@ -309,7 +344,7 @@ public void connect() throws IOException {
if (checksumType != ChecksumType.NONE) {
confirmSupportOfChecksum(checksumType);
}
channel.write(new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition));
requestBinaryLogStream();
} catch (IOException e) {
if (channel != null && channel.isOpen()) {
channel.close();
Expand All @@ -328,14 +363,42 @@ public void connect() throws IOException {
if (keepAlive && !isKeepAliveThreadRunning()) {
spawnKeepAliveThread();
}
EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(EventType.ROTATE);
if (eventDataDeserializer.getClass() != RotateEventDataDeserializer.class &&
ensureEventDataDeserializer(EventType.ROTATE, RotateEventDataDeserializer.class);
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
ensureEventDataDeserializer(EventType.GTID, GtidEventDataDeserializer.class);
}
}
listenForEventPackets();
}

private void requestBinaryLogStream() throws IOException {
Command dumpBinaryLogCommand;
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
dumpBinaryLogCommand = new DumpBinaryLogGtidCommand(serverId, binlogFilename, binlogPosition, gtidSet);
} else {
dumpBinaryLogCommand = new DumpBinaryLogCommand(serverId, binlogFilename, binlogPosition);
}
}
channel.write(dumpBinaryLogCommand);
}

private void ensureEventDataDeserializer(EventType eventType,
Class<? extends EventDataDeserializer> eventDataDeserializerClass) {
EventDataDeserializer eventDataDeserializer = eventDeserializer.getEventDataDeserializer(eventType);
if (eventDataDeserializer.getClass() != eventDataDeserializerClass &&
eventDataDeserializer.getClass() != EventDeserializer.EventDataWrapper.Deserializer.class) {
eventDeserializer.setEventDataDeserializer(EventType.ROTATE,
new EventDeserializer.EventDataWrapper.Deserializer(new RotateEventDataDeserializer(),
EventDataDeserializer internalEventDataDeserializer;
try {
internalEventDataDeserializer = eventDataDeserializerClass.newInstance();
} catch (Exception e) {
throw new RuntimeException(e);
}
eventDeserializer.setEventDataDeserializer(eventType,
new EventDeserializer.EventDataWrapper.Deserializer(internalEventDataDeserializer,
eventDataDeserializer));
}
listenForEventPackets();
}

private void authenticate(String salt, int collation) throws IOException {
Expand Down Expand Up @@ -526,6 +589,7 @@ private void listenForEventPackets() throws IOException {
if (isConnected()) {
notifyEventListeners(event);
updateClientBinlogFilenameAndPosition(event);
updateGtidSet(event);
}
}
} catch (Exception e) {
Expand Down Expand Up @@ -565,6 +629,24 @@ private void updateClientBinlogFilenameAndPosition(Event event) {
}
}

private void updateGtidSet(Event event) {
EventHeader eventHeader = event.getHeader();
if (eventHeader.getEventType() == EventType.GTID) {
synchronized (gtidSetAccessLock) {
if (gtidSet != null) {
EventData eventData = event.getData();
GtidEventData gtidEventData;
if (eventData instanceof EventDeserializer.EventDataWrapper) {
gtidEventData = (GtidEventData) ((EventDeserializer.EventDataWrapper) eventData).getInternal();
} else {
gtidEventData = (GtidEventData) eventData;
}
gtidSet.add(gtidEventData.getGtid());
}
}
}
}

private ResultSetRowPacket[] readResultSet() throws IOException {
List<ResultSetRowPacket> resultSet = new LinkedList<ResultSetRowPacket>();
while ((channel.read())[0] != (byte) 0xFE /* eof */) { /* skip */ }
Expand Down
215 changes: 215 additions & 0 deletions src/main/java/com/github/shyiko/mysql/binlog/GtidSet.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,215 @@
/*
* Copyright 2015 Stanley Shyiko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.shyiko.mysql.binlog;

import java.util.*;

/**
* GTID set as described in <a href="https://dev.mysql.com/doc/refman/5.6/en/replication-gtids-concepts.html">GTID Concepts</a>
* of MySQL 5.6 Reference Manual.
*
* <pre>
* gtid_set: uuid_set[,uuid_set]...
* uuid_set: uuid:interval[:interval]...
* uuid: hhhhhhhh-hhhh-hhhh-hhhh-hhhhhhhhhhhh, h: [0-9|A-F]
* interval: n[-n], (n >= 1)
* </pre>
*
* @author <a href="mailto:stanley.shyiko@gmail.com">Stanley Shyiko</a>
*/
public class GtidSet {

private final Map<String, UUIDSet> map = new LinkedHashMap<String, UUIDSet>();

public GtidSet(String gtidSet) {
String[] uuidSets = gtidSet.isEmpty() ? new String[0] : gtidSet.split(",");
for (String uuidSet : uuidSets) {
int uuidSeparatorIndex = uuidSet.indexOf(":");
String sourceId = uuidSet.substring(0, uuidSeparatorIndex);
List<Interval> intervals = new ArrayList<Interval>();
String[] rawIntervals = uuidSet.substring(uuidSeparatorIndex + 1).split(":");
for (String interval : rawIntervals) {
String[] is = interval.split("-");
long[] split = new long[is.length];
for (int i = 0, e = is.length; i < e; i++) {
split[i] = Long.parseLong(is[i]);
}
if (split.length == 1) {
split = new long[] {split[0], split[0] + 1};
}
intervals.add(new Interval(split[0], split[1]));
}
map.put(sourceId, new UUIDSet(sourceId, intervals));
}
}

public Collection<UUIDSet> getUUIDSets() {
return map.values();
}

/**
* @param gtid GTID ("source_id:transaction_id")
* @return whether or not gtid was added to the set (false if it was already there)
*/
public boolean add(String gtid) {
String[] split = gtid.split(":");
String sourceId = split[0];
long transactionId = Long.parseLong(split[1]);
UUIDSet uuidSet = map.get(sourceId);
if (uuidSet == null) {
map.put(sourceId, uuidSet = new UUIDSet(sourceId, new ArrayList<Interval>()));
}
List<Interval> intervals = (List<Interval>) uuidSet.intervals;
int index = findInterval(intervals, transactionId);
boolean addedToExisting = false;
if (index < intervals.size()) {
Interval interval = intervals.get(index);
if (interval.getStart() == transactionId + 1) {
interval.start = transactionId;
addedToExisting = true;
} else
if (interval.getEnd() == transactionId) {
interval.end = transactionId + 1;
addedToExisting = true;
} else
if (interval.getStart() <= transactionId && transactionId < interval.getEnd()) {
return false;
}
}
if (!addedToExisting) {
intervals.add(index, new Interval(transactionId, transactionId + 1));
}
if (intervals.size() > 1) {
joinAdjacentIntervals(intervals, index);
}
return true;
}

/**
* Collapses intervals like a-b:b-c into a-c (only in index+-1 range).
*/
private void joinAdjacentIntervals(List<Interval> intervals, int index) {
for (int i = Math.min(index + 1, intervals.size() - 1), e = Math.max(index - 1, 0); i > e; i--) {
Interval a = intervals.get(i - 1), b = intervals.get(i);
if (a.getEnd() == b.getStart()) {
a.end = b.end;
intervals.remove(i);
}
}
}

@Override
public String toString() {
List<String> gtids = new ArrayList<String>();
for (UUIDSet uuidSet : map.values()) {
gtids.add(uuidSet.getUUID() + ":" + join(uuidSet.intervals, ":"));
}
return join(gtids, ",");
}

/**
* @return index which is either a pointer to the interval containing v or a position at which v can be added
*/
private static int findInterval(List<Interval> ii, long v) {
int l = 0, p = 0, r = ii.size();
while (l < r) {
p = (l + r) / 2;
Interval i = ii.get(p);
if (i.getEnd() < v) {
l = p + 1;
} else
if (v < i.getStart()) {
r = p;
} else {
return p;
}
}
if (!ii.isEmpty() && ii.get(p).getEnd() < v) {
p++;
}
return p;
}

private String join(Collection o, String delimiter) {
if (o.isEmpty()) {
return "";
}
StringBuilder sb = new StringBuilder();
for (Object o1 : o) {
sb.append(o1).append(delimiter);
}
return sb.substring(0, sb.length() - delimiter.length());
}

public static class UUIDSet {

private String uuid;
private Collection<Interval> intervals;

public UUIDSet(String uuid, Collection<Interval> intervals) {
this.uuid = uuid;
this.intervals = intervals;
}

public String getUUID() {
return uuid;
}

public Collection<Interval> getIntervals() {
return intervals;
}
}

public static class Interval implements Comparable<Interval> {

private long start;
private long end;

public Interval(long start, long end) {
this.start = start;
this.end = end;
}

public long getStart() {
return start;
}

public long getEnd() {
return end;
}

@Override
public String toString() {
return start + "-" + end;
}

@Override
public int compareTo(Interval o) {
return saturatedCast(this.start - o.start);
}

private static int saturatedCast(long value) {
if (value > Integer.MAX_VALUE) {
return Integer.MAX_VALUE;
}
if (value < Integer.MIN_VALUE) {
return Integer.MIN_VALUE;
}
return (int) value;
}
}

}
Loading

0 comments on commit 3f30768

Please sign in to comment.