Table of Contents(中文说明)
- 1. Redis-replicator
- 2. Install
- 3. Simple usage
- 4. Advanced topics
- 5. Other topics
- 6. Contributors
- 7. References
- 8. Supported by
Redis Replicator implement Redis Replication protocol written in java. It can parse, filter, broadcast the RDB and AOF events in a real time manner. It also can synchronize redis data to your local cache or to database. The following I mentioned Command
which means Writable Command
(e.g. set
,hmset
) in Redis and excludes the Readable Command
(e.g. get
,hmget
)
479688557
jdk 1.7+
maven-3.2.3+
redis 2.6 - 4.0.x
<dependency>
<groupId>com.moilioncircle</groupId>
<artifactId>redis-replicator</artifactId>
<version>2.4.6</version>
</dependency>
$mvn clean install package -Dmaven.test.skip=true
redis version | redis-replicator version |
---|---|
[2.6, 4.0.x] | [2.3.0, ] |
[2.6, 4.0-RC3] | [2.1.0, 2.2.0] |
[2.6, 3.2.x] | [1.0.18](not supported) |
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
System.out.println(kv);
}
});
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
System.out.println(command);
}
});
replicator.open();
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb");
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
System.out.println(kv);
}
});
replicator.open();
Replicator replicator = new RedisReplicator("redis:///path/to/appendonly.aof");
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
System.out.println(command);
}
});
replicator.open();
[RDB file][AOF tail]
aof-use-rdb-preamble yes
final Replicator replicator = new RedisReplicator("redis:///path/to/appendonly.aof");
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
System.out.println(kv);
}
});
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
System.out.println(command);
}
});
replicator.open();
See examples
public static class YourAppendCommand implements Command {
private final String key;
private final String value;
public YourAppendCommand(String key, String value) {
this.key = key;
this.value = value;
}
public String getKey() {
return key;
}
public String getValue() {
return value;
}
@Override
public String toString() {
return "YourAppendCommand{" +
"key='" + key + '\'' +
", value='" + value + '\'' +
'}';
}
}
}
public class YourAppendParser implements CommandParser<YourAppendCommand> {
@Override
public YourAppendCommand parse(Object[] command) {
return new YourAppendCommand(new String((byte[]) command[1], UTF_8), new String((byte[]) command[2], UTF_8));
}
}
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
replicator.addCommandParser(CommandName.name("APPEND"),new YourAppendParser());
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
if(command instanceof YourAppendCommand){
YourAppendCommand appendCommand = (YourAppendCommand)command;
// your code goes here
}
}
});
See CommandExtensionExample.java
$cd /path/to/redis-4.0-rc2/src/modules
$make
loadmodule /path/to/redis-4.0-rc2/src/modules/hellotype.so
public class HelloTypeModuleParser implements ModuleParser<HelloTypeModule> {
@Override
public HelloTypeModule parse(RedisInputStream in, int version) throws IOException {
DefaultRdbModuleParser parser = new DefaultRdbModuleParser(in);
int elements = parser.loadUnsigned(version).intValue();
long[] ary = new long[elements];
int i = 0;
while (elements-- > 0) {
ary[i++] = parser.loadSigned(version);
}
return new HelloTypeModule(ary);
}
}
public class HelloTypeModule implements Module {
private final long[] value;
public HelloTypeModule(long[] value) {
this.value = value;
}
public long[] getValue() {
return value;
}
@Override
public String toString() {
return "HelloTypeModule{" +
"value=" + Arrays.toString(value) +
'}';
}
}
public class HelloTypeParser implements CommandParser<HelloTypeCommand> {
@Override
public HelloTypeCommand parse(Object[] command) {
String key = new String((byte[]) command[1], Constants.UTF_8);
long value = Long.parseLong(new String((byte[]) command[2], Constants.UTF_8));
return new HelloTypeCommand(key, value);
}
}
public class HelloTypeCommand implements Command {
private final String key;
private final long value;
public long getValue() {
return value;
}
public String getKey() {
return key;
}
public HelloTypeCommand(String key, long value) {
this.key = key;
this.value = value;
}
@Override
public String toString() {
return "HelloTypeCommand{" +
"key='" + key + '\'' +
", value=" + value +
'}';
}
}
public static void main(String[] args) throws IOException {
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
replicator.addCommandParser(CommandName.name("hellotype.insert"), new HelloTypeParser());
replicator.addModuleParser("hellotype", 0, new HelloTypeModuleParser());
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
if (kv instanceof KeyStringValueModule) {
System.out.println(kv);
}
}
});
replicator.addCommandListener(new CommandListener() {
@Override
public void handle(Replicator replicator, Command command) {
if (command instanceof HelloTypeCommand) {
System.out.println(command);
}
}
});
replicator.open();
}
See ModuleExtensionExample.java
- Extends
RdbVisitor
- Register your
RdbVisitor
toReplicator
usingsetRdbVisitor
method.
| full resynchronization | partial resynchronization |
↓-----------<--------------<-------------<----------<-----↓--------------<--------------↑
↓ ↓ ↑ <-reconnect
connect->------->-------------->------------->---------->-------------------->--------------x <-disconnect
↓ ↓ ↓ ↓ ↓
prefullsync auxfields... rdbs... postfullsync cmds...
Before redis-replicator-2.4.0, We construct RedisReplicator
like the following:
Replicator replicator = new RedisReplicator("127.0.0.1", 6379, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/dump.rdb", FileType.RDB, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/appendonly.aof", FileType.AOF, Configuration.defaultSetting());
Replicator replicator = new RedisReplicator(new File("/path/to/appendonly.aof", FileType.MIXED, Configuration.defaultSetting());
After redis-replicator-2.4.0, We introduced a new concept(Redis URI) which simplify the constructor of RedisReplicator
.
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb");
Replicator replicator = new RedisReplicator("redis:///path/to/appendonly.aof");
// configuration setting example
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379?authPassword=foobared&readTimeout=10000&ssl=yes");
Replicator replicator = new RedisReplicator("redis:///path/to/dump.rdb?rateLimit=1000000");
commands | commands | commands | commands | commands | commands |
---|---|---|---|---|---|
PING | APPEND | SET | SETEX | MSET | DEL |
SADD | HMSET | HSET | LSET | EXPIRE | EXPIREAT |
GETSET | HSETNX | MSETNX | PSETEX | SETNX | SETRANGE |
HDEL | UNLINK | SREM | LPOP | LPUSH | LPUSHX |
LRem | RPOP | RPUSH | RPUSHX | ZREM | ZINTERSTORE |
INCR | DECR | INCRBY | PERSIST | SELECT | FLUSHALL |
FLUSHDB | HINCRBY | ZINCRBY | MOVE | SMOVE | BRPOPLPUSH |
PFCOUNT | PFMERGE | SDIFFSTORE | RENAMENX | PEXPIREAT | SINTERSTORE |
ZADD | BITFIELD | SUNIONSTORE | RESTORE | LINSERT | ZREMRANGEBYLEX |
GEOADD | PEXPIRE | ZUNIONSTORE | EVAL | SCRIPT | ZREMRANGEBYRANK |
PUBLISH | BITOP | SETBIT | SWAPDB | PFADD | ZREMRANGEBYSCORE |
RENAME | MULTI | EXEC | LTRIM | RPOPLPUSH | SORT |
- Adjust redis server setting like the following. more details please refer to redis.conf
client-output-buffer-limit slave 0 0 0
WARNNING: this setting may run out of memory of redis server in some cases.
- Set log level to debug
- If you are using log4j2, add logger like the following:
<Logger name="com.moilioncircle" level="debug">
<AppenderRef ref="YourAppender"/>
</Logger>
Configuration.defaultSetting().setVerbose(true);
// redis uri
"redis://127.0.0.1:6379?verbose=yes"
System.setProperty("javax.net.ssl.trustStore", "/path/to/truststore");
System.setProperty("javax.net.ssl.trustStorePassword", "password");
System.setProperty("javax.net.ssl.trustStoreType", "your_type");
Configuration.defaultSetting().setSsl(true);
//optional setting
Configuration.defaultSetting().setSslSocketFactory(sslSocketFactory);
Configuration.defaultSetting().setSslParameters(sslParameters);
Configuration.defaultSetting().setHostnameVerifier(hostnameVerifier);
Configuration.defaultSetting().setAuthPassword("foobared");
// redis uri
"redis://127.0.0.1:6379?authPassword=foobared"
- Adjust redis server setting like the following
repl-backlog-size
repl-backlog-ttl
repl-ping-slave-periods
repl-ping-slave-period
MUST less than Configuration.getReadTimeout()
, default Configuration.getReadTimeout()
is 30 seconds
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
final long start = System.currentTimeMillis();
final AtomicInteger acc = new AtomicInteger(0);
replicator.addRdbListener(new RdbListener() {
@Override
public void preFullSync(Replicator replicator) {
System.out.println("pre full sync");
}
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
acc.incrementAndGet();
}
@Override
public void postFullSync(Replicator replicator, long checksum) {
long end = System.currentTimeMillis();
System.out.println("time elapsed:" + (end - start));
System.out.println("rdb event count:" + acc.get());
}
});
replicator.open();
- For any
KeyValuePair
type exceptKeyStringValueModule
, we can get the raw bytes. In some cases(e.g. HyperLogLog),this is very useful.
Replicator replicator = new RedisReplicator("redis://127.0.0.1:6379");
replicator.addRdbListener(new RdbListener.Adaptor() {
@Override
public void handle(Replicator replicator, KeyValuePair<?> kv) {
if (kv instanceof KeyStringValueString) {
KeyStringValueString ksvs = (KeyStringValueString) kv;
byte[] rawValue = ksvs.getRawValue();
// handle raw bytes value
} else if (kv instanceof KeyStringValueHash) {
KeyStringValueHash ksvh = (KeyStringValueHash) kv;
Map<byte[], byte[]> rawValue = ksvh.getRawValue();
// handle raw bytes value
} else {
...
}
}
});
replicator.open();
For easy operation, the key of return type Map<byte[], byte[]>
of KeyStringValueHash.getRawValue
, we can get
and put
the key as value type
KeyStringValueHash ksvh = (KeyStringValueHash) kv;
Map<byte[], byte[]> rawValue = ksvh.getRawValue();
byte[] value = new byte[]{2};
rawValue.put(new byte[]{1}, value);
System.out.println(rawValue.get(new byte[]{1}) == value) //will print true
Commands also support raw bytes.
SetCommand set = (SetCommand) command;
byte[] rawKey = set.getRawKey();
byte[] rawValue = set.getRawValue();
According to 4.3. Write your own rdb parser, This tool built in an Iterable Rdb Parser so that handle huge key value pair.
More details please refer to:
[1] HugeKVFileExample.java
[2] HugeKVSocketExample.java
- Leon Chen
- Adrian Yao
- Trydofor
- Argun
- Sean Pan
- Special thanks to Kevin Zheng
YourKit is kindly supporting this open source project with its full-featured Java Profiler.
YourKit, LLC is the creator of innovative and intelligent tools for profiling
Java and .NET applications. Take a look at YourKit's leading software products:
YourKit Java Profiler and
YourKit .NET Profiler.
IntelliJ IDEA is a Java integrated development environment (IDE) for developing computer software.
It is developed by JetBrains (formerly known as IntelliJ), and is available as an Apache 2 Licensed community edition,
and in a proprietary commercial edition. Both can be used for commercial development.
Redisson is Redis based In-Memory Data Grid for Java offers distributed objects and services (BitSet
, Set
, Multimap
, SortedSet
, Map
, List
, Queue
, BlockingQueue
, Deque
, BlockingDeque
, Semaphore
, Lock
, AtomicLong
, CountDownLatch
, Publish / Subscribe
, Bloom filter
, Remote service
, Spring cache
, Executor service
, Live Object service
, Scheduler service
) backed by Redis server. Redisson provides more convenient and easiest way to work with Redis. Redisson objects provides a separation of concern, which allows you to keep focus on the data modeling and application logic.