-
Notifications
You must be signed in to change notification settings - Fork 2
Replication from MySQL to Kafka
License
TiVo/wombat
Folders and files
Name | Name | Last commit message | Last commit date | |
---|---|---|---|---|
Repository files navigation
Overview -------- Wombat is a program that replicates data from MySQL to Apache Kafka. It replicates row-level data in near real-time. It uses the MySQL replication protocol to speak to a MySQL server and act as a MySQL replication follower. Wombat receives a stream of row-level changes, and writes each change to Kafka. If a row is inserted/updated, Wombat sends the row's new data to Kafka. If a row is deleted, Wombat sends a delete marker to Kafka. Key features of Wombat: * Works with any MySQL/MariaDB-compatible server (tested against MariaDB 5.5 and MariaDB 10.0 (haven't done a lot of testing). * Safe. It replicates from the master MySQL database with just read-only, replication-only permissions. (need to test this) * Low impact on performance. It replicates asynchronously. * Transactionally consistent output. Writes only appear if they have been successfully committed to MySQL; aborted transactions do not emit messages. Writes appear in the same order as they were committed to MySQL. * Fault-tolerant. If Wombat crashes, it will resume replication at the point where it failed. It will not lose any data. Demo ---- [See demo_video.mov file] Building -------- To compile wombat: $ ./gradlew build fatJar To build the docker quickstart images: $ docker build -f replicator/Dockerfile.quickstart -t wombat-quickstart replicator $ docker build -t mariadb-quickstart mariadb-quickstart Runtime requirements -------------------- MySQL configuration requirements: * row-based binlog replication enabled (https://dev.mysql.com/doc/refman/5.1/en/replication-howto-masterbaseconfig.html) * --binlog-row-image=full (http://dev.mysql.com/doc/refman/5.6/en/replication-options-binary-log.html#sysvar_binlog_row_image) * all tables must have primary keys * Only replicates file/offset replication, does not replicate systems using GTIDs. Quickstart ---------- The easiest way to start playing with Wombat is to use the pre-built and pre-configured Docker images. They have a hardcoded configuration and are not intended for production use. $ docker run -d --name zookeeper --hostname zookeeper confluent/zookeeper $ docker run -d --name kafka --hostname kafka --link zookeeper:zookeeper --env KAFKA_LOG_CLEANUP_POLICY=compact confluent/kafka $ docker run -d --name mariadb mariadb-quickstart # run wombat $ docker run --rm -it --link kafka:kafka --link zookeeper:zookeeper --link mariadb:mariadb --name wombat wombat-quickstart # interact. consume from kafka. $ docker run -it --rm --link zookeeper:zookeeper --link kafka:kafka confluent/tools /bin/bash -c '/usr/bin/kafka-console-consumer --zookeeper ${ZOOKEEPER_PORT_2181_TCP_ADDR}:${ZOOKEEPER_PORT_2181_TCP_PORT} --topic prefix.test.users --property print.key=true --from-beginning' # insert some data $ docker run --rm -it --link mariadb:mariadb mariadb:5.5 sh -c 'mysql --user=root --password=${MARIADB_ENV_MYSQL_ROOT_PASSWORD} --host=${MARIADB_PORT_3306_TCP_ADDR} --port=${MARIADB_PORT_3306_TCP_PORT}' MariaDB [(none)]> create database test; MariaDB [(none)]> create table test.users (userId int auto_increment primary key, name char(128)); MariaDB [(none)]> insert into test.users (name) values ("James"); MariaDB [(none)]> update test.users set name = "James Cheng" where name = "James"; MariaDB [(none)]> delete from test.users where name = "James Cheng"; # clean up $ docker rm -f wombat mariadb kafka zookeeper Consuming --------- Wombat encodes the row data using JSON, and then wraps it in an HTTP envelope. The envelope headers are used for monitoring replication progress. Each table is replicated to a different Kafka topic. Given the following table structure: CREATE TABLE users (userId INT AUTO_INCREMENT PRIMARY KEY, name CHAR(128)); Inserts: INSERT INTO users (name) VALUES ("James"); BL/1 143 27 ServerId: mysqlhost.domainname BinlogEventStartPosition: 8288 BinlogFilename: mariadb-bin.000008 BinlogTransactionStartPosition: 8173 {"name":"James","userId":1} Updates: UPDATE users SET name = "James Cheng" WHERE name="James"; BL/1 143 33 ServerId: mysqlhost.domainname BinlogEventStartPosition: 8470 BinlogFilename: mariadb-bin.000008 BinlogTransactionStartPosition: 8355 {"name":"James Cheng","userId":1} Deletes: DELETE FROM users WHERE name = "James Cheng"; null Deletes are represented as a message with a null payload. Status ------ Wombat is pre-alpha software. It has only been tested against a limit set of inputs. A lot of thought has been put into its design, but not everything is implemented. I am looking for feedback to help validate the design and guide the implementation. Caveats/issues -------------- * MySQL schema changes are not handled. * Not all MySQL data types are handled properly. * Not tolerant to disk failure (it saves its MySQL replication position into a local file). * Errors are not handled. Since it is still in development mode, it prints errors to stderr and System.exit()'s whenever it encounters an error. Inspiration ----------- Databus - https://github.com/linkedin/databus License ------- Copyright 2015 TiVo Inc. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this software except in compliance with the License in the enclosed file called LICENSE. Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. Contributing ------------ See CONTRIBUTING.md Running manually ---------------- How to run it manually, without docker. Wombat takes configuration from two config files that are passed in at startup. $ cat config.properties MYSQL_SOURCE_IDENTIFIER=myServerId MYSQL_SOURCE_HOST=192.168.1.1 MYSQL_SOURCE_PORT=3306 MYSQL_SOURCE_USER=replication-user MYSQL_SOURCE_PASSWORD=replication-password KAFKA_BROKER=192.168.1.1 KAFKA_PORT=9092 KAFKA_TOPIC_BASE=topic.prefix $ cat checkpoint.txt BINLOG_FILENAME=mariadb-bin.000003 BINLOG_POSITION=4 $ export CONFIG_FILE=config.properties $ export CHECKPOINT_FILE=checkpoint.txt $ java -jar replication/build/libs/replication.jar Configuration ------------- MYSQL_SOURCE_IDENTIFIER An arbitrary string that will be added to each message that originates from this MySQL server MYSQL_SOURCE_HOST Hostname or IP address of the MySQL server to replicate from. MYSQL_SOURCE_PORT The port that MySQL listens on MYSQL_SOURCE_USER The username used to connect to MySQL MYSQL_SOURCE_PASSWORD The password used to connect to MySQL SERVER_ID MySQL's server id setting KAFKA_BROKER IP/Hostname of one of the Kafka brokers KAFKA_PORT Port of the Kafka broker KAFKA_TOPIC_BASE An arbitrary string. A MySQL database and table nane are appended to this string to determine to which topic that table's rows should be sent. For example, a prefix of "server1" could be chosen and a table db.users would be written to the topic "server1.db.users"
About
Replication from MySQL to Kafka
Resources
License
Stars
Watchers
Forks
Releases
No releases published
Packages 0
No packages published