Skip to content

Commit

Permalink
1.支持动态配置为本地缓存使用;2.支持worker健康检测及降级策略;
Browse files Browse the repository at this point in the history
  • Loading branch information
gaoyf committed Jul 22, 2021
1 parent 5837844 commit 01c9167
Showing 27 changed files with 593 additions and 89 deletions.
2 changes: 1 addition & 1 deletion client-parent/annotation/pom.xml
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@
<parent>
<groupId>com.hotcaffeine</groupId>
<artifactId>client-parent</artifactId>
<version>1.0</version>
<version>1.1</version>
</parent>

<artifactId>annotation</artifactId>
2 changes: 1 addition & 1 deletion client-parent/client/pom.xml
Original file line number Diff line number Diff line change
@@ -5,7 +5,7 @@
<parent>
<groupId>com.hotcaffeine</groupId>
<artifactId>client-parent</artifactId>
<version>1.0</version>
<version>1.1</version>
</parent>

<artifactId>client</artifactId>
Original file line number Diff line number Diff line change
@@ -121,8 +121,9 @@ public ValueModel getValueModel(String key, boolean useAsCache) {
return null;
}
ValueModel value = localCache.get(key);
// worker不可达降级为caffeine
if (useAsCache && hotCaffeineDetector.getNettyClient().isWorkerUnreachable()) {
// 配置使用本地缓存或worker不健康降级为caffeine
if (useAsCache
&& (keyRule.isUseAsLocalCache() || !hotCaffeineDetector.getWorkerHealthDetector().isHealthy())) {
if (value == null) {
value = new ValueModel();
localCache.set(key, new ValueModel());
Original file line number Diff line number Diff line change
@@ -18,6 +18,7 @@
import com.hotcaffeine.client.listener.WorkerChangeListener;
import com.hotcaffeine.client.netty.NettyClient;
import com.hotcaffeine.client.push.HashGroupPusher;
import com.hotcaffeine.client.worker.HealthDetector;
import com.hotcaffeine.common.etcd.DefaultEtcdConfig;
import com.hotcaffeine.common.etcd.IEtcdConfig;
import com.hotcaffeine.common.model.KeyCount;
@@ -76,6 +77,9 @@ public class HotCaffeineDetector {
// 本地热键检测缓存
private Cache<String, AtomicInteger> hotDetectCache;

// worker健康探测
private HealthDetector workerHealthDetector;

/**
* 一个app一个实例
*
@@ -86,6 +90,8 @@ public HotCaffeineDetector(IEtcdConfig etcdConfig) {
throw new IllegalArgumentException("Only one instance in jvm!");
}
this.appName = etcdConfig.getUser();
// worker健康探测
this.workerHealthDetector = new HealthDetector();
// 创建netty客户端
this.nettyClient = new NettyClient(appName);
// 创建规则缓存器
@@ -114,7 +120,10 @@ public void start() {
this.appEtcdClient.start();
// initLocalDetectCache
initLocalDetectCache();
ClientLogger.getLogger().info("appName:{} started", appName);
// workerHealthDetector
workerHealthDetector.setHotCaffeineDetector(this);
workerHealthDetector.start();
ClientLogger.getLogger().info("HotCaffeineDetector:{} started", appName);
} catch (Throwable e) {
ClientLogger.getLogger().error("appName:{} start error", appName, e);
}
@@ -378,4 +387,8 @@ public static HotCaffeineDetector getInstance() {
public NettyClient getNettyClient() {
return nettyClient;
}

public HealthDetector getWorkerHealthDetector() {
return workerHealthDetector;
}
}
Original file line number Diff line number Diff line change
@@ -13,9 +13,12 @@
import com.hotcaffeine.common.model.ValueModel;
import com.hotcaffeine.common.util.ClientLogger;
import com.hotcaffeine.common.util.MetricsUtil;
import com.hotcaffeine.common.util.NettyUtil;

import io.etcd.jetcd.shaded.com.google.common.eventbus.AllowConcurrentEvents;
import io.etcd.jetcd.shaded.com.google.common.eventbus.Subscribe;
import io.etcd.jetcd.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder;
import io.netty.channel.Channel;

/**
* 客户端监听到有newKey事件
@@ -43,16 +46,11 @@ public ReceiveNewKeyListener(HotCaffeineDetector hotCaffeineDetector) {
@Subscribe
@AllowConcurrentEvents
public void newKeyComing(ReceiveNewKeyEvent event) {
long now = System.currentTimeMillis();
KeyCount keyCount = event.getKeyCount();
if (keyCount == null) {
return;
}
// 收到新key推送
newKey(keyCount);
}

public void newKey(KeyCount keyCount) {
long now = System.currentTimeMillis();
String key = keyCount.getKey();
if (key == null || key.length() == 0) {
ClientLogger.getLogger().warn("the keyCount:{} key is blank", keyCount);
@@ -64,12 +62,18 @@ public void newKey(KeyCount keyCount) {
ClientLogger.getLogger().warn("the key:{} time difference:{}, now:{} keyCreateAt:{}", key,
(now - keyCount.getCreateTime()), now, keyCount.getCreateTime());
}
// 获取缓存
// 内部key
if (keyCount.isInner()) {
hotCaffeineDetector.getWorkerHealthDetector().receiveHotKey(NettyUtil.parseRemoteAddr(event.getChannel()), keyCount);
return;
}
// 获取规则
KeyRule keyRule = hotCaffeineDetector.getKeyRuleCacher().findRule(key);
if (keyRule == null) {
ClientLogger.getLogger().warn("keyCount:{} keyRule is null", keyCount);
return;
}
// 获取缓存
CacheRule cacheRule = hotCaffeineDetector.getKeyRuleCacher().getCacheRule(keyRule);
if(cacheRule == null) {
ClientLogger.getLogger().warn("keyCount:{} cacheRule is null", keyCount);
@@ -137,6 +141,7 @@ public void newKey(KeyCount keyCount) {

public static class ReceiveNewKeyEvent {
private KeyCount keyCount;
private Channel channel;

public ReceiveNewKeyEvent(KeyCount keyCount) {
this.keyCount = keyCount;
@@ -149,5 +154,13 @@ public KeyCount getKeyCount() {
public void setKeyCount(KeyCount keyCount) {
this.keyCount = keyCount;
}

public Channel getChannel() {
return channel;
}

public void setChannel(Channel channel) {
this.channel = channel;
}
}
}
Original file line number Diff line number Diff line change
@@ -54,9 +54,6 @@ public class NettyClient {

private NettyTrafficMetrics nettyTrafficMetrics;

// worker不可达
private volatile boolean workerUnreachable;

public NettyClient(String appName) {
this.appName = appName;
nettyTrafficMetrics = new NettyTrafficMetrics();
@@ -101,7 +98,6 @@ private boolean channelIsOk(Channel channel) {
public synchronized void connect(Set<String> addresses) {
// 1.无变更直接返回
if (!changed(addresses)) {
setWorkerUnreachable();
return;
}
ClientLogger.getLogger().info("worker changed! old:{}, new:{}", channelTables.keySet(), addresses);
@@ -127,11 +123,6 @@ public synchronized void connect(Set<String> addresses) {
removeAddress(address, "etcd");
}
}
setWorkerUnreachable();
}

private void setWorkerUnreachable() {
workerUnreachable = channelTables.size() <= 0;
}

/**
@@ -216,10 +207,6 @@ public Set<String> getAddresses() {
return channelTables.keySet();
}

public boolean isWorkerUnreachable() {
return workerUnreachable;
}

/**
* 获取hash地址
*
@@ -263,23 +250,25 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, String message) {
protected void channelRead0(ChannelHandlerContext context, String message) {
Message msg = JsonUtil.toBean(message, Message.class);
if (MessageType.PONG == msg.getMessageType()) {
return;
}
if (MessageType.RESPONSE_NEW_KEY == msg.getMessageType()) {
KeyCount keyCount = JsonUtil.toBean(msg.getBody(), KeyCount.class);
ClientLogger.getLogger().info("receive new key:{}", msg);
EventBusUtil.post(new ReceiveNewKeyEvent(keyCount));
ClientLogger.getLogger().info("receive {} new key:{}", NettyUtil.parseRemoteAddr(context.channel()), msg);
ReceiveNewKeyEvent event = new ReceiveNewKeyEvent(keyCount);
event.setChannel(context.channel());
EventBusUtil.post(event);
} else if (MessageType.REQUEST_HOTKEY_VALUE == msg.getMessageType()) {
String key = msg.getBody();
ClientLogger.getLogger().info("receive requestHotValue key:{}", key);
EventBusUtil.post(new RequestHotValueEvent(channelHandlerContext.channel(), key,
EventBusUtil.post(new RequestHotValueEvent(context.channel(), key,
msg.getRequestId()));
} else {
ClientLogger.getLogger().info("unsupported msg type:{}", msg);
EventBusUtil.post(new UnsupportedMessageTypeEvent(channelHandlerContext.channel(), msg));
EventBusUtil.post(new UnsupportedMessageTypeEvent(context.channel(), msg));
}
}
}
Original file line number Diff line number Diff line change
@@ -70,12 +70,6 @@ public void push(Collection<KeyCount> list) {
if (list.size() == 0) {
return;
}
// worker不可达不再推送
if (nettyClient.isWorkerUnreachable()) {
ClientLogger.getLogger().warn("worker is unreachable, cannot push:{}", list.size());
return;
}
MetricsUtil.incrSendKeys(list.size());
Map<String, List<KeyCount>> map = new HashMap<>();
for (KeyCount keyCount : list) {
String server = nettyClient.choose(keyCount.getKey());
@@ -97,6 +91,9 @@ public void push(Collection<KeyCount> list) {
ClientLogger.getLogger().error("{} flush error", server, e);
}
}
if (map.size() > 0) {
MetricsUtil.incrSendKeys(list.size());
}
}

/**
Loading

0 comments on commit 01c9167

Please sign in to comment.