A Vert.x client for tapping into MySQL replication stream. Based on Vert.x 3.4.1.
It uses MySQL Binary Log connector to interact with the MySQL which implemented the MySQL binlog protocol by java.
Be sure the binlog is enabled on the MySQL master and it is in ROW format, otherwise the client cannot receive any row events.
To enable the binary log, start the server with the --log-bin=base_name option. For example:
mysqld --log-bin=mysql-bin --binlog-format=ROW
To specify the format globally for all clients, set the global value of the binlog_format system variable:
SET GLOBAL binlog_format = 'ROW';
Get the latest JAR(s) from here. Alternatively you can include following Maven dependency (available through Maven Central):
<dependency>
<groupId>io.github.guoyu511</groupId>
<artifactId>vertx-mysql-binlog</artifactId>
<version>0.2.0</version>
</dependency>
In order to connect to MySQL as a slave, you need a BinlogClient
instance first.
You can create a client specifying a BinlogClientOptions
:
BinlogClient binlogClient = BinlogClient.create(vertx, binlogClientOptions);
The BinlogClientOptions
containing the following values:
host
the MySQL server host name. Default islocalhost
port
the MySQL server port. Default is3306
username
the username. Default isroot
password
the user's passowrd. Default is null means NO Password.filename
the binlog file where to starts. Default is the current file as masterposition
the binlog position where to starts. Default is the current position as masterkeepAlive
enable "keep alive" feature on this client. Default is truekeepAliveInterval
"keep alive" interval in milliseconds. Default is 1 minutesheartbeatInterval
heartbeat interval in milliseconds. Default is 0 means no heartbeat
Be sure that the user must has the REPLICATION CLIENT
privilege for the given schema.
You can then connect to the MySQL master with the method connect
.
It happens asynchronously and the client may not be connected until some time after the call has returned:
binlogClient.connect();
You can also supplying a handler which will be called after the connection established (or failed).
binlogClient.connect((ar) -> {
ar.succeeded() // true if connection established
});
After connected to the MySQL master as a slave, the client can handle events now.
There were several types of event defined in MySQL binlog protocol. For this client, it only concerned about events related to data modification such as write
, update
and delete
. All the events are presented as JsonObject
.
You can set a handler to the client to handle those types of events.
binlogClient.handler((event) -> {
String type = event.getString("type");
});
For a data modification event (write / update / delete) the JsonObject
will be looks like that:
{
"type" : "write",
"schema" : "test_db",
"table" : "test_table",
"row" : {
"id" : 1000,
"name" : "guoyu"
}
}
The row event containing the following values:
type
the event type, should be one ofwrite
,update
,delete
schema
the database which the data changed intable
the table name which the row changed inrow
the row data in Json, columns as key / value pair.
Column mapping
The origin ROW Events
sent by MySQL master contains the column index but not the column name.
Therefore, the BinlogClient use a MySQLClient
instance to query the column index and names from information_schema
database when received a ROW Events
of a table for first time. Then it cache the column mapping and build the event object with them.
When there is any DROP TABLE
, ALTER TABLE
or CREATE TABLE
event recevied, the mapping cached will be cleared and the BinlogClient will requery the column mapping for the subsequent ROW Events
.
The BinlogClient implemented ReadStream<JsonObject>
interface, that means all the methods provided by the ReadStream
are available.
For example, use pause
to pause reading (that will stop to read from the underlying InputStream) .
binlogClient.pause();
use resume
to continue:
binlogClient.resume();
Even using Pump
to pump the events to another WriteStream
:
Pump.pump(binlogClient, targetStream).start();
Or pump the stream to event bus message producer:
Pump.pump(binlogClient, eventBus.sender('binlog.event')).start();
// handle the event by event bus
eventBus.consumer('binlog.event', (msg) -> {
JsonObject json = msg.body();
});
It also provided a Rx-ified version of the binlog client.
To using the Rx-ified api, create binlog client instance using the BinlogClient
interface under the io.vertx.rxjava.ext.binlog.mysql
package with the RX version of vertx.
import io.vertx.rxjava.ext.binlog.mysql.BinlogClient;
BinlogClient rxBinlogClient = BinlogClient.create(rxVertx, options);
Or wrap a existing client:
BinlogClient rxClient = BinlogClient.newInstance(client);
Then you can use the client as a Observable
, for example, handle all update
events:
rxClient.toObservable()
.filter((event) -> "update".equals(event.getString("type")))
.subscribe((event) -> {
//do sth with this event
});
Some time you need to know the filename and the position where the replication stream at.
For example, you may want to save the filename and position when a event comes.
You can retrieve them use filename
and position
:
binlogClient.handle((event) -> {
//some event coming
String filename = binlogClient.filename();
long position = binlogClient.position();
//save them in any way for future use
});
Next time when you create your client, you can pass the filename and position to the BinlogClientOptions
to let the client to connect at the specified position.
binlogClientOptions.setFilename(filename);
binlogClientOptions.setPosition(position);
BinlogClient binlogClient = BinlogClient.create(vertx, binlogClientOptions);
This ensures that you will not lose any events.
You can hold on to the client for a long time (e.g. the life-time of your verticle).
Once you have finished with it, you should close it:
binlogClient.close();
// or close with a callback handler
binlogClient.close((ar) -> {});
You can run tests with a specified MySQL instance:
% mvn test -Dbinlog.host=[host] -Dbinlog.port=[port] -Dbinlog.user=[user] -Dbinlog.password=[password] -Dbinlog.schema=[schema]
The user must has ALL
privileges for the given schema.