Skip to content

Commit

Permalink
Add support for optional table metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedahamid committed Dec 10, 2018
1 parent ddaabfd commit 72876e7
Show file tree
Hide file tree
Showing 4 changed files with 155 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ public class TableMapEventData implements EventData {
private byte[] columnTypes;
private int[] columnMetadata;
private BitSet columnNullability;
private TableMapEventMetadata eventMetadata;

public long getTableId() {
return tableId;
Expand Down Expand Up @@ -77,6 +78,10 @@ public void setColumnNullability(BitSet columnNullability) {
this.columnNullability = columnNullability;
}

public TableMapEventMetadata getEventMetadata() { return eventMetadata; }

public void setEventMetadata(TableMapEventMetadata eventMetadata) { this.eventMetadata = eventMetadata; }

@Override
public String toString() {
final StringBuilder sb = new StringBuilder();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Copyright 2013 Stanley Shyiko
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.github.shyiko.mysql.binlog.event;

import java.util.List;

/**
* @author <a href="mailto:ahmedahamid@yahoo.com">Ahmed Elbahtemy</a>
*/
public class TableMapEventMetadata {

private List<String> columnNames;
private List<String[]> setValues;
private List<String[]> enumValues;

public void setColumnNames(List<String> columnNames) {
this.columnNames = columnNames;
}

public List<String> getColumnNames() {
return columnNames;
}

public void setSetValues(List<String[]> setValues) {
this.setValues = setValues;
}

public List<String[]> getSetValues() {
return setValues;
}

public void setEnumValues(List<String[]> enumValues) {
this.enumValues = enumValues;
}

public List<String[]> getEnumValues() {
return enumValues;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@
package com.github.shyiko.mysql.binlog.event.deserialization;

import com.github.shyiko.mysql.binlog.event.TableMapEventData;
import com.github.shyiko.mysql.binlog.event.TableMapEventMetadata;
import com.github.shyiko.mysql.binlog.io.ByteArrayInputStream;

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* @author <a href="mailto:stanley.shyiko@gmail.com">Stanley Shyiko</a>
Expand All @@ -38,6 +43,7 @@ public TableMapEventData deserialize(ByteArrayInputStream inputStream) throws IO
inputStream.readPackedInteger(); // metadata length
eventData.setColumnMetadata(readMetadata(inputStream, eventData.getColumnTypes()));
eventData.setColumnNullability(inputStream.readBitSet(numberOfColumns, true));
eventData.setEventMetadata(readEventMetadata(inputStream, numberOfColumns));
return eventData;
}

Expand Down Expand Up @@ -74,6 +80,62 @@ private int[] readMetadata(ByteArrayInputStream inputStream, byte[] columnTypes)
return metadata;
}

private static TableMapEventMetadata readEventMetadata(ByteArrayInputStream inputStream,
int columnCount) throws IOException {
if (inputStream.available() <= 0) {
return null;
}

TableMapEventMetadata eventMetadata = new TableMapEventMetadata();
while (inputStream.available() > 0) {
MetadataFieldType fieldType = MetadataFieldType.byCode(inputStream.readInteger(1));
int fieldLength = inputStream.readPackedInteger();

switch (fieldType) {
case COLUMN_NAME:
eventMetadata.setColumnNames(readColumnNames(inputStream.readAsNewStream(fieldLength)));
break;
case SET_STR_VALUE:
eventMetadata.setSetValues(readTypeValues(inputStream.readAsNewStream(fieldLength)));
break;
case ENUM_STR_VALUE:
eventMetadata.setEnumValues(readTypeValues(inputStream.readAsNewStream(fieldLength)));
break;
case SIGNEDNESS:
case DEFAULT_CHARSET:
case COLUMN_CHARSET:
case GEOMETRY_TYPE:
case SIMPLE_PRIMARY_KEY:
case PRIMARY_KEY_WITH_PREFIX:
default:
inputStream.skip(fieldLength);
break;
}
}
return eventMetadata;
}

private static List<String[]> readTypeValues(ByteArrayInputStream inputStream) throws IOException {
List<String[]> valuesList = new ArrayList<String[]>();
while (inputStream.available() > 0) {
List<String> typeValues = new ArrayList<String>();
int setValuesCount = inputStream.readPackedInteger();
for (int i = 0; i < setValuesCount; ++i) {
typeValues.add(inputStream.readLengthEncodedString());
}
valuesList.add(typeValues.toArray(new String[typeValues.size()]));
}
return valuesList;
}

private static List<String> readColumnNames(ByteArrayInputStream inputStream) throws IOException {
List<String> columnNames = new ArrayList<String>();
while (inputStream.available() > 0) {
columnNames.add(inputStream.readLengthEncodedString());
}
return columnNames;
}

private static int bigEndianInteger(byte[] bytes, int offset, int length) {
int result = 0;
for (int i = offset; i < (offset + length); i++) {
Expand All @@ -83,4 +145,36 @@ private static int bigEndianInteger(byte[] bytes, int offset, int length) {
return result;
}

private enum MetadataFieldType {
SIGNEDNESS(1), // UNSIGNED flag of numeric columns
DEFAULT_CHARSET(2), // Default character set of string columns
COLUMN_CHARSET(3), // Character set of string columns
COLUMN_NAME(4),
SET_STR_VALUE(5), // String code of SET columns
ENUM_STR_VALUE(6), // String code of ENUM columns
GEOMETRY_TYPE(7), // Real type of geometry columns
SIMPLE_PRIMARY_KEY(8), // Primary key without prefix
PRIMARY_KEY_WITH_PREFIX(9); // Primary key with prefix

private final int code;

private MetadataFieldType(int code) {
this.code = code;
}

public int getCode() { return code; }

private static final Map<Integer, MetadataFieldType> INDEX_BY_CODE;

static {
INDEX_BY_CODE = new HashMap<Integer, MetadataFieldType>();
for (MetadataFieldType fieldType : values()) {
INDEX_BY_CODE.put(fieldType.code, fieldType);
}
}

public static MetadataFieldType byCode(int code) {
return INDEX_BY_CODE.get(code);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,10 @@ public byte[] read(int length) throws IOException {
return bytes;
}

public ByteArrayInputStream readAsNewStream(int length) throws IOException {
return new ByteArrayInputStream(read(length));
}

public void fill(byte[] bytes, int offset, int length) throws IOException {
int remaining = length;
while (remaining != 0) {
Expand Down

0 comments on commit 72876e7

Please sign in to comment.