Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL 5.6 support #20

Merged
merged 15 commits into from
Jul 18, 2016
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,4 @@
/tmp/
.env
.vagrant
*.retry
15 changes: 13 additions & 2 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,24 +1,35 @@
sudo: true

dist: trusty
addons:
apt:
packages:
- mysql-server-5.6
- mysql-client-core-5.6
- mysql-client-5.6
- haveged # Extra entropy

language: ruby

rvm:
- jruby-9.0.5.0
- jruby-9.0.5.0

services:
- mysql

before_install:
- sudo service haveged start # Extra entropy too ensure quick start time for JRuby
- printf "[mysqld]\nlog-bin=mysql-bin\nserver-id=1\nbinlog-format=ROW\n" | sudo tee /etc/mysql/conf.d/binlog.cnf
- sudo service mysql restart
- mysql -u root -e 'create database ecco_test;'
- gem install bundler -v 1.10.6
- gem install bundler

env:
global:
- DATABASE_USER=root
- DATABASE_PASS=""
- DATABASE_URL=jdbc:mysql://localhost:3306/ecco_test
- JRUBY_OPTS='--client -J-XX:+TieredCompilation -J-XX:TieredStopAtLevel=1 -Xcext.enabled=false -J-Xss2m -Xcompile.invokedynamic=false'

script: bin/all_specs

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

[![Build Status](https://travis-ci.org/twingly/ecco.svg?branch=master)](https://travis-ci.org/twingly/ecco)

MySQL replication binlog parser using [mysql-binlog-connector-java].
MySQL (version 5.5 and 5.6) replication binlog parser using [mysql-binlog-connector-java].

## Installation

Expand Down
3 changes: 3 additions & 0 deletions Vagrantfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,8 @@ Vagrant.configure("2") do |config|
end

ubuntu.vm.network :forwarded_port, host: 3306, guest: 3306
ubuntu.vm.provider "virtualbox" do |vbox|
vbox.memory = 1024
end
end
end
25 changes: 19 additions & 6 deletions lib/ecco/row_event_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@

module Ecco
class RowEventListener < EventListener
ROW_EVENTS = [
EventType::WRITE_ROWS,
EventType::UPDATE_ROWS,
EventType::DELETE_ROWS,
]
# MySQL v1 and v2 row events
WRITE_EVENTS = [EventType::WRITE_ROWS, EventType::EXT_WRITE_ROWS]
UPDATE_EVENTS = [EventType::UPDATE_ROWS, EventType::EXT_UPDATE_ROWS]
DELETE_EVENTS = [EventType::DELETE_ROWS, EventType::EXT_DELETE_ROWS]

ROW_EVENTS = WRITE_EVENTS + UPDATE_EVENTS + DELETE_EVENTS

def table_event
EventType::TABLE_MAP
Expand All @@ -26,9 +27,9 @@ def on_event(event)
@table_map_event = event
when *accepted_events
row_event = Ecco::RowEvent.new
row_event.type = type.to_s
row_event.table_id = data.get_table_id
row_event.rows = data.rows
row_event.type = row_type_to_string(type)

if @table_map_event
table_event_data = @table_map_event.get_data
Expand All @@ -40,5 +41,17 @@ def on_event(event)
@callback.call(row_event)
end
end

private

def row_type_to_string(type)
if WRITE_EVENTS.include?(type)
"WRITE_ROWS"
elsif UPDATE_EVENTS.include?(type)
"UPDATE_ROWS"
elsif DELETE_EVENTS.include?(type)
"DELETE_ROWS"
end
end
end
end
3 changes: 3 additions & 0 deletions lib/ecco/save_event_listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@ class SaveEventListener < EventListener
EventType::QUERY,
EventType::ROTATE,
EventType::WRITE_ROWS,
EventType::EXT_WRITE_ROWS,
EventType::UPDATE_ROWS,
EventType::EXT_UPDATE_ROWS,
EventType::DELETE_ROWS,
EventType::EXT_DELETE_ROWS,
]

def accepted_events
Expand Down
6 changes: 4 additions & 2 deletions spec/event_context.rb
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
java_import com.github.shyiko.mysql.binlog.event.EventType

shared_context "event" do
let(:table_id) { 1 }
let(:rows) { double("List") }
let(:database) { "some_database" }
let(:table) { "some_table" }
let(:table_event_type) { "event_type_table_map" }
let(:row_event_type) { "event_type_write_rows" }
let(:table_event_type) { EventType::TABLE_MAP }
let(:row_event_type) { EventType::EXT_WRITE_ROWS }

let(:table_event) do
event = double("Event")
Expand Down
4 changes: 3 additions & 1 deletion spec/integration/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,9 @@

DatabaseHelper.insert(table_name, mysql_row)

sleep 0.1 while event_order.count < events_to_wait_for
TestHelper.with_timeout do
sleep 0.1 while event_order.count < events_to_wait_for
end
subject.stop
event_order.last(2)
end
Expand Down
96 changes: 94 additions & 2 deletions spec/lib/ecco/row_event_listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@

subject { described_class.new(client) }

it "should return a row event with correct values" do
before do
allow(subject).to receive(:accepted_events) { [row_event_type] }
allow(subject).to receive(:table_event) { table_event_type }
end

it "should return a row event with correct values" do
actual_type = nil
actual_table_id = 0
actual_rows = nil
Expand All @@ -26,11 +28,101 @@
subject.on_event(table_event)
subject.on_event(row_event)

expect(actual_type).to eq(row_event_type)
expect(actual_type).not_to be_empty
expect(actual_table_id).to eq(table_id)
expect(actual_rows).to eq(rows)
expect(actual_database).to eq(database)
expect(actual_table).to eq(table)
end

describe "RowEvent#type" do
context "MySQL 5.5 v1 ROW_EVENTS" do
context "WRITE_ROWS" do
let(:row_event_type) { EventType::WRITE_ROWS }
it "should return WRITE_ROWS" do
actual_type = nil
subject.callback = Proc.new do |row_event|
actual_type = row_event.type
end

subject.on_event(row_event)

expect(actual_type).to eq("WRITE_ROWS")
end
end

context "UPDATE_ROWS" do
let(:row_event_type) { EventType::UPDATE_ROWS }
it "should return UPDATE_ROWS" do
actual_type = nil
subject.callback = Proc.new do |row_event|
actual_type = row_event.type
end

subject.on_event(row_event)

expect(actual_type).to eq("UPDATE_ROWS")
end
end

context "DELETE_ROWS" do
let(:row_event_type) { EventType::DELETE_ROWS }
it "should return DELETE_ROWS" do
actual_type = nil
subject.callback = Proc.new do |row_event|
actual_type = row_event.type
end

subject.on_event(row_event)

expect(actual_type).to eq("DELETE_ROWS")
end
end
end

context "MySQL 5.6 v2 ROW_EVENTS" do
context "EXT_WRITE_ROWS" do
let(:row_event_type) { EventType::EXT_WRITE_ROWS }
it "should return WRITE_ROWS" do
actual_type = nil
subject.callback = Proc.new do |row_event|
actual_type = row_event.type
end

subject.on_event(row_event)

expect(actual_type).to eq("WRITE_ROWS")
end
end

context "EXT_UPDATE_ROWS" do
let(:row_event_type) { EventType::EXT_UPDATE_ROWS }
it "should return UPDATE_ROWS" do
actual_type = nil
subject.callback = Proc.new do |row_event|
actual_type = row_event.type
end

subject.on_event(row_event)

expect(actual_type).to eq("UPDATE_ROWS")
end
end

context "EXT_DELETE_ROWS" do
let(:row_event_type) { EventType::EXT_DELETE_ROWS }
it "should return DELETE_ROWS" do
actual_type = nil
subject.callback = Proc.new do |row_event|
actual_type = row_event.type
end

subject.on_event(row_event)

expect(actual_type).to eq("DELETE_ROWS")
end
end
end
end
end
end
4 changes: 2 additions & 2 deletions vagrant/playbook.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
---
- hosts: all
sudo: yes
become: yes
tasks:

- name: Update package cache
Expand All @@ -12,7 +12,7 @@
- python-mysqldb

- name: Install mysql server
apt: name=mysql-server state=present
apt: name=mysql-server-5.6 state=present

- name: Copy mysql config file
copy: src=files/ecco-my.cnf dest=/etc/mysql/conf.d/
Expand Down