From 01c9167dc260baf7bebb4d25305bf99960813d59 Mon Sep 17 00:00:00 2001 From: yongfeigao <554312685@qq.com> Date: Thu, 22 Jul 2021 16:31:42 +0800 Subject: [PATCH] =?UTF-8?q?1.=E6=94=AF=E6=8C=81=E5=8A=A8=E6=80=81=E9=85=8D?= =?UTF-8?q?=E7=BD=AE=E4=B8=BA=E6=9C=AC=E5=9C=B0=E7=BC=93=E5=AD=98=E4=BD=BF?= =?UTF-8?q?=E7=94=A8;2.=E6=94=AF=E6=8C=81worker=E5=81=A5=E5=BA=B7=E6=A3=80?= =?UTF-8?q?=E6=B5=8B=E5=8F=8A=E9=99=8D=E7=BA=A7=E7=AD=96=E7=95=A5;?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- client-parent/annotation/pom.xml | 2 +- client-parent/client/pom.xml | 2 +- .../com/hotcaffeine/client/HotCaffeine.java | 5 +- .../client/HotCaffeineDetector.java | 15 +- .../listener/ReceiveNewKeyListener.java | 27 ++- .../hotcaffeine/client/netty/NettyClient.java | 25 +-- .../client/push/HashGroupPusher.java | 9 +- .../client/worker/HealthDetector.java | 212 ++++++++++++++++++ .../client/worker/HealthDetectorMBean.java | 18 ++ .../client/worker/TimeConsumed.java | 58 +++++ .../hotcaffeine/client/worker/WorkerStat.java | 135 +++++++++++ .../client/HotCaffeineDetectorTest.java | 2 +- .../client/src/test/resources/logback.xml | 3 + client-parent/common/pom.xml | 2 +- .../hotcaffeine/common/model/KeyCount.java | 64 +++++- .../com/hotcaffeine/common/model/KeyRule.java | 33 +-- .../common/model/KeyRuleCacher.java | 16 ++ .../com/hotcaffeine/common/util/IpUtil.java | 26 +-- client-parent/pom.xml | 2 +- .../dashboard/etcd/DashboardEtcdClient.java | 3 +- pom.xml | 2 +- .../worker/consumer/NewKeyConsumer.java | 2 +- .../worker/etcd/WorkerEtcdClient.java | 2 +- .../netty/processor/NewKeyProcessor.java | 3 + .../worker/pusher/AppServerPusher.java | 7 + .../worker/pusher/DashboardPusher.java | 3 + .../worker/task/TopHotKeyTask.java | 4 +- 27 files changed, 593 insertions(+), 89 deletions(-) create mode 100644 client-parent/client/src/main/java/com/hotcaffeine/client/worker/HealthDetector.java create mode 100644 client-parent/client/src/main/java/com/hotcaffeine/client/worker/HealthDetectorMBean.java create mode 100644 client-parent/client/src/main/java/com/hotcaffeine/client/worker/TimeConsumed.java create mode 100644 client-parent/client/src/main/java/com/hotcaffeine/client/worker/WorkerStat.java diff --git a/client-parent/annotation/pom.xml b/client-parent/annotation/pom.xml index bb3c58a..8ab7140 100644 --- a/client-parent/annotation/pom.xml +++ b/client-parent/annotation/pom.xml @@ -4,7 +4,7 @@ com.hotcaffeine client-parent - 1.0 + 1.1 annotation diff --git a/client-parent/client/pom.xml b/client-parent/client/pom.xml index 9479502..2cf182f 100644 --- a/client-parent/client/pom.xml +++ b/client-parent/client/pom.xml @@ -5,7 +5,7 @@ com.hotcaffeine client-parent - 1.0 + 1.1 client diff --git a/client-parent/client/src/main/java/com/hotcaffeine/client/HotCaffeine.java b/client-parent/client/src/main/java/com/hotcaffeine/client/HotCaffeine.java index 245189d..499aed4 100644 --- a/client-parent/client/src/main/java/com/hotcaffeine/client/HotCaffeine.java +++ b/client-parent/client/src/main/java/com/hotcaffeine/client/HotCaffeine.java @@ -121,8 +121,9 @@ public ValueModel getValueModel(String key, boolean useAsCache) { return null; } ValueModel value = localCache.get(key); - // worker不可达降级为caffeine - if (useAsCache && hotCaffeineDetector.getNettyClient().isWorkerUnreachable()) { + // 配置使用本地缓存或worker不健康降级为caffeine + if (useAsCache + && (keyRule.isUseAsLocalCache() || !hotCaffeineDetector.getWorkerHealthDetector().isHealthy())) { if (value == null) { value = new ValueModel(); localCache.set(key, new ValueModel()); diff --git a/client-parent/client/src/main/java/com/hotcaffeine/client/HotCaffeineDetector.java b/client-parent/client/src/main/java/com/hotcaffeine/client/HotCaffeineDetector.java index 829904e..e9d02d7 100644 --- a/client-parent/client/src/main/java/com/hotcaffeine/client/HotCaffeineDetector.java +++ b/client-parent/client/src/main/java/com/hotcaffeine/client/HotCaffeineDetector.java @@ -18,6 +18,7 @@ import com.hotcaffeine.client.listener.WorkerChangeListener; import com.hotcaffeine.client.netty.NettyClient; import com.hotcaffeine.client.push.HashGroupPusher; +import com.hotcaffeine.client.worker.HealthDetector; import com.hotcaffeine.common.etcd.DefaultEtcdConfig; import com.hotcaffeine.common.etcd.IEtcdConfig; import com.hotcaffeine.common.model.KeyCount; @@ -76,6 +77,9 @@ public class HotCaffeineDetector { // 本地热键检测缓存 private Cache hotDetectCache; + // worker健康探测 + private HealthDetector workerHealthDetector; + /** * 一个app一个实例 * @@ -86,6 +90,8 @@ public HotCaffeineDetector(IEtcdConfig etcdConfig) { throw new IllegalArgumentException("Only one instance in jvm!"); } this.appName = etcdConfig.getUser(); + // worker健康探测 + this.workerHealthDetector = new HealthDetector(); // 创建netty客户端 this.nettyClient = new NettyClient(appName); // 创建规则缓存器 @@ -114,7 +120,10 @@ public void start() { this.appEtcdClient.start(); // initLocalDetectCache initLocalDetectCache(); - ClientLogger.getLogger().info("appName:{} started", appName); + // workerHealthDetector + workerHealthDetector.setHotCaffeineDetector(this); + workerHealthDetector.start(); + ClientLogger.getLogger().info("HotCaffeineDetector:{} started", appName); } catch (Throwable e) { ClientLogger.getLogger().error("appName:{} start error", appName, e); } @@ -378,4 +387,8 @@ public static HotCaffeineDetector getInstance() { public NettyClient getNettyClient() { return nettyClient; } + + public HealthDetector getWorkerHealthDetector() { + return workerHealthDetector; + } } diff --git a/client-parent/client/src/main/java/com/hotcaffeine/client/listener/ReceiveNewKeyListener.java b/client-parent/client/src/main/java/com/hotcaffeine/client/listener/ReceiveNewKeyListener.java index bc25783..4ce35c7 100644 --- a/client-parent/client/src/main/java/com/hotcaffeine/client/listener/ReceiveNewKeyListener.java +++ b/client-parent/client/src/main/java/com/hotcaffeine/client/listener/ReceiveNewKeyListener.java @@ -13,9 +13,12 @@ import com.hotcaffeine.common.model.ValueModel; import com.hotcaffeine.common.util.ClientLogger; import com.hotcaffeine.common.util.MetricsUtil; +import com.hotcaffeine.common.util.NettyUtil; + import io.etcd.jetcd.shaded.com.google.common.eventbus.AllowConcurrentEvents; import io.etcd.jetcd.shaded.com.google.common.eventbus.Subscribe; import io.etcd.jetcd.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.channel.Channel; /** * 客户端监听到有newKey事件 @@ -43,16 +46,11 @@ public ReceiveNewKeyListener(HotCaffeineDetector hotCaffeineDetector) { @Subscribe @AllowConcurrentEvents public void newKeyComing(ReceiveNewKeyEvent event) { + long now = System.currentTimeMillis(); KeyCount keyCount = event.getKeyCount(); if (keyCount == null) { return; } - // 收到新key推送 - newKey(keyCount); - } - - public void newKey(KeyCount keyCount) { - long now = System.currentTimeMillis(); String key = keyCount.getKey(); if (key == null || key.length() == 0) { ClientLogger.getLogger().warn("the keyCount:{} key is blank", keyCount); @@ -64,12 +62,18 @@ public void newKey(KeyCount keyCount) { ClientLogger.getLogger().warn("the key:{} time difference:{}, now:{} keyCreateAt:{}", key, (now - keyCount.getCreateTime()), now, keyCount.getCreateTime()); } - // 获取缓存 + // 内部key + if (keyCount.isInner()) { + hotCaffeineDetector.getWorkerHealthDetector().receiveHotKey(NettyUtil.parseRemoteAddr(event.getChannel()), keyCount); + return; + } + // 获取规则 KeyRule keyRule = hotCaffeineDetector.getKeyRuleCacher().findRule(key); if (keyRule == null) { ClientLogger.getLogger().warn("keyCount:{} keyRule is null", keyCount); return; } + // 获取缓存 CacheRule cacheRule = hotCaffeineDetector.getKeyRuleCacher().getCacheRule(keyRule); if(cacheRule == null) { ClientLogger.getLogger().warn("keyCount:{} cacheRule is null", keyCount); @@ -137,6 +141,7 @@ public void newKey(KeyCount keyCount) { public static class ReceiveNewKeyEvent { private KeyCount keyCount; + private Channel channel; public ReceiveNewKeyEvent(KeyCount keyCount) { this.keyCount = keyCount; @@ -149,5 +154,13 @@ public KeyCount getKeyCount() { public void setKeyCount(KeyCount keyCount) { this.keyCount = keyCount; } + + public Channel getChannel() { + return channel; + } + + public void setChannel(Channel channel) { + this.channel = channel; + } } } diff --git a/client-parent/client/src/main/java/com/hotcaffeine/client/netty/NettyClient.java b/client-parent/client/src/main/java/com/hotcaffeine/client/netty/NettyClient.java index 84fd956..a4028e2 100644 --- a/client-parent/client/src/main/java/com/hotcaffeine/client/netty/NettyClient.java +++ b/client-parent/client/src/main/java/com/hotcaffeine/client/netty/NettyClient.java @@ -54,9 +54,6 @@ public class NettyClient { private NettyTrafficMetrics nettyTrafficMetrics; - // worker不可达 - private volatile boolean workerUnreachable; - public NettyClient(String appName) { this.appName = appName; nettyTrafficMetrics = new NettyTrafficMetrics(); @@ -101,7 +98,6 @@ private boolean channelIsOk(Channel channel) { public synchronized void connect(Set addresses) { // 1.无变更直接返回 if (!changed(addresses)) { - setWorkerUnreachable(); return; } ClientLogger.getLogger().info("worker changed! old:{}, new:{}", channelTables.keySet(), addresses); @@ -127,11 +123,6 @@ public synchronized void connect(Set addresses) { removeAddress(address, "etcd"); } } - setWorkerUnreachable(); - } - - private void setWorkerUnreachable() { - workerUnreachable = channelTables.size() <= 0; } /** @@ -216,10 +207,6 @@ public Set getAddresses() { return channelTables.keySet(); } - public boolean isWorkerUnreachable() { - return workerUnreachable; - } - /** * 获取hash地址 * @@ -263,23 +250,25 @@ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws E } @Override - protected void channelRead0(ChannelHandlerContext channelHandlerContext, String message) { + protected void channelRead0(ChannelHandlerContext context, String message) { Message msg = JsonUtil.toBean(message, Message.class); if (MessageType.PONG == msg.getMessageType()) { return; } if (MessageType.RESPONSE_NEW_KEY == msg.getMessageType()) { KeyCount keyCount = JsonUtil.toBean(msg.getBody(), KeyCount.class); - ClientLogger.getLogger().info("receive new key:{}", msg); - EventBusUtil.post(new ReceiveNewKeyEvent(keyCount)); + ClientLogger.getLogger().info("receive {} new key:{}", NettyUtil.parseRemoteAddr(context.channel()), msg); + ReceiveNewKeyEvent event = new ReceiveNewKeyEvent(keyCount); + event.setChannel(context.channel()); + EventBusUtil.post(event); } else if (MessageType.REQUEST_HOTKEY_VALUE == msg.getMessageType()) { String key = msg.getBody(); ClientLogger.getLogger().info("receive requestHotValue key:{}", key); - EventBusUtil.post(new RequestHotValueEvent(channelHandlerContext.channel(), key, + EventBusUtil.post(new RequestHotValueEvent(context.channel(), key, msg.getRequestId())); } else { ClientLogger.getLogger().info("unsupported msg type:{}", msg); - EventBusUtil.post(new UnsupportedMessageTypeEvent(channelHandlerContext.channel(), msg)); + EventBusUtil.post(new UnsupportedMessageTypeEvent(context.channel(), msg)); } } } diff --git a/client-parent/client/src/main/java/com/hotcaffeine/client/push/HashGroupPusher.java b/client-parent/client/src/main/java/com/hotcaffeine/client/push/HashGroupPusher.java index c051268..b0f397b 100644 --- a/client-parent/client/src/main/java/com/hotcaffeine/client/push/HashGroupPusher.java +++ b/client-parent/client/src/main/java/com/hotcaffeine/client/push/HashGroupPusher.java @@ -70,12 +70,6 @@ public void push(Collection list) { if (list.size() == 0) { return; } - // worker不可达不再推送 - if (nettyClient.isWorkerUnreachable()) { - ClientLogger.getLogger().warn("worker is unreachable, cannot push:{}", list.size()); - return; - } - MetricsUtil.incrSendKeys(list.size()); Map> map = new HashMap<>(); for (KeyCount keyCount : list) { String server = nettyClient.choose(keyCount.getKey()); @@ -97,6 +91,9 @@ public void push(Collection list) { ClientLogger.getLogger().error("{} flush error", server, e); } } + if (map.size() > 0) { + MetricsUtil.incrSendKeys(list.size()); + } } /** diff --git a/client-parent/client/src/main/java/com/hotcaffeine/client/worker/HealthDetector.java b/client-parent/client/src/main/java/com/hotcaffeine/client/worker/HealthDetector.java new file mode 100644 index 0000000..26d9d59 --- /dev/null +++ b/client-parent/client/src/main/java/com/hotcaffeine/client/worker/HealthDetector.java @@ -0,0 +1,212 @@ +package com.hotcaffeine.client.worker; + +import java.lang.management.ManagementFactory; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +import javax.management.MBeanServer; +import javax.management.ObjectName; + +import com.hotcaffeine.client.HotCaffeineDetector; +import com.hotcaffeine.common.model.KeyCount; +import com.hotcaffeine.common.model.KeyRule; +import com.hotcaffeine.common.model.Message; +import com.hotcaffeine.common.model.MessageType; +import com.hotcaffeine.common.util.ClientLogger; +import com.hotcaffeine.common.util.JsonUtil; + +import io.etcd.jetcd.shaded.com.google.common.util.concurrent.ThreadFactoryBuilder; + +/** + * worker健康探测 + * + * @author yongfeigao + * @date 2021年7月16日 + */ +public class HealthDetector implements HealthDetectorMBean { + // 最大响应时间 + public static final int MAX_RESPONSE_TIME_IN_MILLIS = 500; + + // worker状态map + private ConcurrentMap workerStatMap = new ConcurrentHashMap<>(); + + // 健康 + private volatile boolean healthy = true; + + private HotCaffeineDetector hotCaffeineDetector; + + // 任务执行 + private ScheduledExecutorService sendKeyExecutorService; + + // 任务执行 + private ScheduledExecutorService detectExecutorService; + + /** + * 启动任务 + */ + public void start() { + sendKeyExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat( + "sendKeyExecutorService").setDaemon(true).build()); + // 定时发送检测key + sendKeyExecutorService.scheduleAtFixedRate(() -> { + try { + sendDetectKey(); + } catch (Throwable e) { + ClientLogger.getLogger().error("workerHealthDetector sendDetectKey error:{}", e.getMessage()); + } + }, 0, 5000, TimeUnit.MILLISECONDS); + // 定时检测任务 + detectExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryBuilder().setNameFormat( + "detectExecutorService").setDaemon(true).build()); + detectExecutorService.scheduleAtFixedRate(() -> { + try { + detect(); + } catch (Throwable e) { + ClientLogger.getLogger().error("workerHealthDetector detect error:{}", e.getMessage()); + } + }, 31000, 31000, TimeUnit.MILLISECONDS); + registerMBean(); + ClientLogger.getLogger().info("workerHealthDetector start"); + } + + /** + * 发送探测key + */ + public void sendDetectKey() { + String appName = hotCaffeineDetector.getAppName(); + // 选择一个keyRule + KeyRule keyRule = hotCaffeineDetector.getKeyRuleCacher().selectKeyRule(); + if (keyRule == null) { + ClientLogger.getLogger().warn("workerHealthDetector appName:{} keyRule is null", appName); + return; + } + // 向所有worker发送探测 + for (String address : hotCaffeineDetector.getNettyClient().getAddresses()) { + KeyCount keyCount = KeyCount.buildInnerKeyCount(appName, keyRule); + try { + Message message = sendKeyCount(address, keyCount); + workerStatMap.computeIfAbsent(address, k -> new WorkerStat(address)) + .addTimeConsumed(new TimeConsumed(message.getCreateTime(), keyCount.getKey())); + } catch (Exception e) { + ClientLogger.getLogger().warn("workerHealthDetector server:{} sendDetectKey error:{}", address, + e.toString()); + } + } + } + + /** + * 发送keyCount + * + * @param address + * @param keyCount + * @return + * @throws InterruptedException + */ + private Message sendKeyCount(String address, KeyCount keyCount) throws InterruptedException { + List list = new LinkedList<>(); + list.add(keyCount); + Message message = new Message(hotCaffeineDetector.getAppName(), MessageType.REQUEST_NEW_KEY, + JsonUtil.toJSON(list), System.currentTimeMillis()); + hotCaffeineDetector.getNettyClient().writeAndFlush(address, message); + return message; + } + + /** + * 收到热key + * + * @param keyCount + */ + public void receiveHotKey(String address, KeyCount keyCount) { + WorkerStat workerStat = workerStatMap.get(address); + if (workerStat == null) { + ClientLogger.getLogger().warn("workerHealthDetector:{} key:{} no matched request", address, + keyCount.getKey()); + return; + } + workerStat.setConsumed(keyCount.getKey()); + } + + /** + * 检测 + */ + public void detect() { + int unhealthyCount = 0; + int totalCount = 0; + // 检测 + for (String address : hotCaffeineDetector.getNettyClient().getAddresses()) { + WorkerStat workerStat = workerStatMap.get(address); + if (workerStat == null) { + continue; + } + ++totalCount; + if (!workerStat.healthy()) { + ++unhealthyCount; + } + } + // 移除不用的worker + for (String address : workerStatMap.keySet()) { + if (!hotCaffeineDetector.getNettyClient().getAddresses().contains(address)) { + workerStatMap.remove(address); + ClientLogger.getLogger().info("WorkerHealthDetector remove:{}", address); + } + } + // 没有任何检测不健康 + if (totalCount == 0) { + setHealthy(false); + ClientLogger.getLogger().info("WorkerHealthDetector unhealthy! total:{}", totalCount); + return; + } + // 任意worker不健康即不健康 + if (unhealthyCount > 0) { + setHealthy(false); + ClientLogger.getLogger().info("WorkerHealthDetector unhealthy! count:{}", unhealthyCount); + return; + } + ClientLogger.getLogger().info("WorkerHealthDetector healthy! total:{}", totalCount); + setHealthy(true); + } + + /** + * 关闭 + */ + public void shutdown() { + sendKeyExecutorService.shutdown(); + detectExecutorService.shutdown(); + ClientLogger.getLogger().info("WorkerHealthDetector shutdown!"); + } + + private void registerMBean() { + try { + String mbeanName = "com.hotcaffeine:name=workerStat"; + ObjectName objectName = new ObjectName(mbeanName); + MBeanServer mBeanServer = ManagementFactory.getPlatformMBeanServer(); + ClientLogger.getLogger().info("register mbean:{}", mbeanName); + mBeanServer.registerMBean(this, objectName); + } catch (Throwable e) { + ClientLogger.getLogger().warn("mqmetrics mbean register error:{}", e.getMessage()); + } + } + + public void setHotCaffeineDetector(HotCaffeineDetector hotCaffeineDetector) { + this.hotCaffeineDetector = hotCaffeineDetector; + } + + public boolean isHealthy() { + return healthy; + } + + public void setHealthy(boolean healthy) { + this.healthy = healthy; + } + + @Override + public Map getWorkerStat() { + return workerStatMap; + } +} diff --git a/client-parent/client/src/main/java/com/hotcaffeine/client/worker/HealthDetectorMBean.java b/client-parent/client/src/main/java/com/hotcaffeine/client/worker/HealthDetectorMBean.java new file mode 100644 index 0000000..548025b --- /dev/null +++ b/client-parent/client/src/main/java/com/hotcaffeine/client/worker/HealthDetectorMBean.java @@ -0,0 +1,18 @@ +package com.hotcaffeine.client.worker; + +import java.util.Map; + +/** + * worker状态 mbean + * + * @author yongfeigao + * @date 2021年7月21日 + */ +public interface HealthDetectorMBean { + + /** + * 获取worker状态 + * @return + */ + public Map getWorkerStat(); +} diff --git a/client-parent/client/src/main/java/com/hotcaffeine/client/worker/TimeConsumed.java b/client-parent/client/src/main/java/com/hotcaffeine/client/worker/TimeConsumed.java new file mode 100644 index 0000000..6776296 --- /dev/null +++ b/client-parent/client/src/main/java/com/hotcaffeine/client/worker/TimeConsumed.java @@ -0,0 +1,58 @@ +package com.hotcaffeine.client.worker; + +/** + * 时间消耗 + * + * @author yongfeigao + * @date 2021年7月21日 + */ +public class TimeConsumed { + // 发送的key + private String key; + // 开始时间 + private long start; + // 消耗时间 + private long consumed = -1; + // 是否检测过 + private boolean detected; + + public TimeConsumed() { + } + + public TimeConsumed(long start, String key) { + this.start = start; + this.key = key; + } + + public long getStart() { + return start; + } + + public void setStart(long start) { + this.start = start; + } + + public long getConsumed() { + return consumed; + } + + public void setConsumed(long consumed) { + this.consumed = consumed; + } + + public String getKey() { + return key; + } + + public void setKey(String key) { + this.key = key; + } + + public boolean isDetected() { + return detected; + } + + public void setDetected(boolean detected) { + this.detected = detected; + } +} diff --git a/client-parent/client/src/main/java/com/hotcaffeine/client/worker/WorkerStat.java b/client-parent/client/src/main/java/com/hotcaffeine/client/worker/WorkerStat.java new file mode 100644 index 0000000..bc27024 --- /dev/null +++ b/client-parent/client/src/main/java/com/hotcaffeine/client/worker/WorkerStat.java @@ -0,0 +1,135 @@ +package com.hotcaffeine.client.worker; + +import com.hotcaffeine.common.util.ClientLogger; +/** + * worker状态 + * + * @author yongfeigao + * @date 2021年7月21日 + */ +public class WorkerStat { + // 最大响应时间 + public static final int MAX_RESPONSE_TIME_IN_MILLIS = 500; + + // 时间消耗数组 + private TimeConsumed[] timeConsumedArray; + // 数组索引 + private long index; + + // worker + private String worker; + + // 是否健康 + private boolean healthy = true; + + public WorkerStat(String worker) { + this.worker = worker; + this.timeConsumedArray = new TimeConsumed[7]; + } + + public void addTimeConsumed(TimeConsumed timeConsumed) { + timeConsumedArray[(int) (index++ % timeConsumedArray.length)] = timeConsumed; + } + + public void setConsumed(String key) { + for (TimeConsumed timeConsumed : timeConsumedArray) { + if (timeConsumed == null) { + continue; + } + if (!timeConsumed.isDetected() && key.equals(timeConsumed.getKey())) { + timeConsumed.setConsumed(System.currentTimeMillis() - timeConsumed.getStart()); + return; + } + } + } + + /** + * 健康判定,每30秒检测6次,如下情况被认定为不健康: + * 1.没有检测。 + * 2.一半及以上的检测无响应。 + * 3.一半及以上的检测超时。 + * 4.检测平均耗时超过阈值。 + * + * @return + */ + public boolean healthy() { + int totalSize = 0; + int invalidSize = 0; + int timeoutSize = 0; + long totalConsumed = 0; + for (TimeConsumed timeConsumed : timeConsumedArray) { + if (timeConsumed == null || timeConsumed.isDetected()) { + continue; + } + ++totalSize; + // 没有响应的暂时不处理 + if (timeConsumed.getConsumed() < 0) { + ++invalidSize; + } else { + // 超时 + if (timeConsumed.getConsumed() > MAX_RESPONSE_TIME_IN_MILLIS) { + ++timeoutSize; + ClientLogger.getLogger().warn("workerHealthDetector:{} key:{} hot use:{}ms too long!", + worker, timeConsumed.getKey(), timeConsumed.getConsumed()); + } + totalConsumed += timeConsumed.getConsumed(); + } + timeConsumed.setDetected(true); + } + // 没有检测数据 + if (totalSize == 0) { + ClientLogger.getLogger().warn("workerHealthDetector:{} unhealthy! no request", worker); + return setHealthy(false); + } + // 一半的检测没有响应数据 + if (totalSize > 1 && invalidSize / (double) totalSize >= 0.5) { + ClientLogger.getLogger().warn("workerHealthDetector:{} unhealthy! invalid:{} total:{}", worker, + invalidSize, totalSize); + return setHealthy(false); + } + // 平均耗时超过阈值 + double avgConsumed = totalConsumed / (double) totalSize; + if (avgConsumed >= MAX_RESPONSE_TIME_IN_MILLIS) { + ClientLogger.getLogger().warn("workerHealthDetector:{} unhealthy! avgConsumed:{}ms total:{}", + worker, format(avgConsumed), totalSize); + return setHealthy(false); + } + // 一半的检测超时 + if (timeoutSize / (double) totalSize >= 0.5) { + ClientLogger.getLogger().warn("workerHealthDetector:{} unhealthy! timeout:{} total:{}", worker, + timeoutSize, totalSize); + return setHealthy(false); + } + ClientLogger.getLogger().info("workerHealthDetector:{} healthy, avgConsumed:{}ms", + worker, format(avgConsumed)); + return setHealthy(true); + } + + /** + * 保留微秒 + * + * @param value + * @return + */ + private double format(double value) { + long v = (long) (value * 1000); + return v / 1000D; + } + + public boolean isHealthy() { + return healthy; + } + + public boolean setHealthy(boolean healthy) { + this.healthy = healthy; + return this.healthy; + } + + public TimeConsumed[] getTimeConsumedArray() { + return timeConsumedArray; + } + + public long getIndex() { + return index; + } +} diff --git a/client-parent/client/src/test/java/com/hotcaffeine/client/HotCaffeineDetectorTest.java b/client-parent/client/src/test/java/com/hotcaffeine/client/HotCaffeineDetectorTest.java index ca24b98..2407d48 100644 --- a/client-parent/client/src/test/java/com/hotcaffeine/client/HotCaffeineDetectorTest.java +++ b/client-parent/client/src/test/java/com/hotcaffeine/client/HotCaffeineDetectorTest.java @@ -35,7 +35,7 @@ public void test() throws InterruptedException { String key = "api:user:123"; while(true) { hotSpotCache.getValue(key, k->"{\"id\":123, \"name\":\"hotcaffeine\", \"age\": 1}"); - Thread.sleep(1); + Thread.sleep(100); } } diff --git a/client-parent/client/src/test/resources/logback.xml b/client-parent/client/src/test/resources/logback.xml index cbe2072..ae36920 100644 --- a/client-parent/client/src/test/resources/logback.xml +++ b/client-parent/client/src/test/resources/logback.xml @@ -10,6 +10,9 @@ %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + diff --git a/client-parent/common/pom.xml b/client-parent/common/pom.xml index ac7084a..c60f00f 100644 --- a/client-parent/common/pom.xml +++ b/client-parent/common/pom.xml @@ -5,7 +5,7 @@ com.hotcaffeine client-parent - 1.0 + 1.1 4.0.0 diff --git a/client-parent/common/src/main/java/com/hotcaffeine/common/model/KeyCount.java b/client-parent/common/src/main/java/com/hotcaffeine/common/model/KeyCount.java index 0d13d9a..54be697 100644 --- a/client-parent/common/src/main/java/com/hotcaffeine/common/model/KeyCount.java +++ b/client-parent/common/src/main/java/com/hotcaffeine/common/model/KeyCount.java @@ -1,5 +1,9 @@ package com.hotcaffeine.common.model; +import com.hotcaffeine.common.util.IpUtil; + +import io.netty.channel.Channel; + /** * 热key的定义 * @@ -7,6 +11,9 @@ * @version 1.0 */ public class KeyCount { + // 内部key计数器 + public static long innerKeyCounter; + /** * 创建的时间 */ @@ -28,17 +35,11 @@ public class KeyCount { * 是否是删除事件 */ private boolean remove; - - @Override - public String toString() { - return "keyCount{" + - "createTime=" + createTime + - ", key='" + key + '\'' + - ", count=" + count + - ", appName='" + appName + '\'' + - ", remove=" + remove + - '}'; - } + + // 是否是内部key + private boolean isInner; + + private transient Channel channel; public boolean isRemove() { return remove; @@ -83,4 +84,45 @@ public void setKey(String key) { public String uniqueKey() { return appName + key; } + + public static KeyCount buildInnerKeyCount(String appName, KeyRule keyRule) { + KeyCount keyCount = new KeyCount(); + keyCount.setAppName(appName); + keyCount.setCount(keyRule.getThreshold()); + keyCount.setInner(true); + String key = IpUtil.INSTANCE_ID + ":" + (innerKeyCounter++); + if (keyRule.isPrefix()) { + keyCount.setKey(keyRule.getKey() + ":" + key); + } else { + keyCount.setKey(KeyRule.buildFullKey(keyRule.getKey(), key)); + } + return keyCount; + } + + public boolean isInner() { + return isInner; + } + + public void setInner(boolean isInner) { + this.isInner = isInner; + } + + public Channel getChannel() { + return channel; + } + + public void setChannel(Channel channel) { + this.channel = channel; + } + + @Override + public String toString() { + return "keyCount{" + + "createTime=" + createTime + + ", key='" + key + '\'' + + ", count=" + count + + ", appName='" + appName + '\'' + + ", remove=" + remove + + '}'; + } } diff --git a/client-parent/common/src/main/java/com/hotcaffeine/common/model/KeyRule.java b/client-parent/common/src/main/java/com/hotcaffeine/common/model/KeyRule.java index 9642b87..1a11a6d 100644 --- a/client-parent/common/src/main/java/com/hotcaffeine/common/model/KeyRule.java +++ b/client-parent/common/src/main/java/com/hotcaffeine/common/model/KeyRule.java @@ -24,11 +24,6 @@ public class KeyRule { // 开启本地检测 private boolean enableLocalDetector; - /** - * 描述 - */ - private String desc; - // 目标qps private double destQps = Double.MAX_VALUE; @@ -47,6 +42,9 @@ public class KeyRule { // 空值过期时间 private int nullValueExpire; + + // 当做本地缓存用(不用检测) + private boolean useAsLocalCache; public boolean isPrefix() { return prefix; @@ -68,10 +66,6 @@ public boolean isEnableLocalDetector() { return enableLocalDetector; } - public String getDesc() { - return desc; - } - public int getInterval() { return interval; } @@ -80,10 +74,6 @@ public int getThreshold() { return threshold; } - public void setDesc(String desc) { - this.desc = desc; - } - public void setInterval(int interval) { this.interval = interval; } @@ -189,6 +179,14 @@ public void setNullValueExpire(int nullValueExpire) { this.nullValueExpire = nullValueExpire; } + public boolean isUseAsLocalCache() { + return useAsLocalCache; + } + + public void setUseAsLocalCache(boolean useAsLocalCache) { + this.useAsLocalCache = useAsLocalCache; + } + @Override public int hashCode() { final int prime = 31; @@ -202,6 +200,7 @@ public int hashCode() { result = prime * result + (prefix ? 1231 : 1237); result = prime * result + threshold; result = prime * result + topkCount; + result = prime * result + (useAsLocalCache ? 1231 : 1237); result = prime * result + (useTopKAsHotKey ? 1231 : 1237); return result; } @@ -239,6 +238,8 @@ public boolean equals(Object obj) { return false; if (topkCount != other.topkCount) return false; + if (useAsLocalCache != other.useAsLocalCache) + return false; if (useTopKAsHotKey != other.useTopKAsHotKey) return false; return true; @@ -247,8 +248,8 @@ public boolean equals(Object obj) { @Override public String toString() { return "KeyRule [key=" + key + ", prefix=" + prefix + ", interval=" + interval + ", threshold=" + threshold - + ", enableLocalDetector=" + enableLocalDetector + ", desc=" + desc + ", destQps=" + destQps - + ", topkCount=" + topkCount + ", useTopKAsHotKey=" + useTopKAsHotKey + ", cacheName=" + cacheName - + ", disabled=" + disabled + ", nullValueExpire=" + nullValueExpire + "]"; + + ", enableLocalDetector=" + enableLocalDetector + ", destQps=" + destQps + ", topkCount=" + topkCount + + ", useTopKAsHotKey=" + useTopKAsHotKey + ", cacheName=" + cacheName + ", disabled=" + disabled + + ", nullValueExpire=" + nullValueExpire + ", useAsLocalCache=" + useAsLocalCache + "]"; } } diff --git a/client-parent/common/src/main/java/com/hotcaffeine/common/model/KeyRuleCacher.java b/client-parent/common/src/main/java/com/hotcaffeine/common/model/KeyRuleCacher.java index f7aec80..d04e012 100644 --- a/client-parent/common/src/main/java/com/hotcaffeine/common/model/KeyRuleCacher.java +++ b/client-parent/common/src/main/java/com/hotcaffeine/common/model/KeyRuleCacher.java @@ -328,6 +328,22 @@ public List getKeyRuleList() { return keyRuleList; } + /** + * 任意选择一个KeyRule + * + * @return + */ + public KeyRule selectKeyRule() { + for (KeyRule keyRule : keyRuleMap.values()) { + return keyRule; + } + List keyRuleList = prefixKeyRuleList; + for (KeyRule keyRule : keyRuleList) { + return keyRule; + } + return null; + } + public static class CacheRuleChangeEvent { private List cacheRuleList; diff --git a/client-parent/common/src/main/java/com/hotcaffeine/common/util/IpUtil.java b/client-parent/common/src/main/java/com/hotcaffeine/common/util/IpUtil.java index 5869692..9baf072 100644 --- a/client-parent/common/src/main/java/com/hotcaffeine/common/util/IpUtil.java +++ b/client-parent/common/src/main/java/com/hotcaffeine/common/util/IpUtil.java @@ -18,10 +18,11 @@ package com.hotcaffeine.common.util; +import java.lang.management.ManagementFactory; +import java.lang.management.RuntimeMXBean; import java.net.InetAddress; import java.net.NetworkInterface; import java.net.SocketException; -import java.net.UnknownHostException; import java.util.Enumeration; /** @@ -31,12 +32,9 @@ */ public final class IpUtil { - /** - * IP地址的正则表达式. - */ - public static final String IP_REGEX = "((\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])(\\.(\\d|[1-9]\\d|1\\d{2}|2[0-4]\\d|25[0-5])){3})"; - private static volatile String cachedIpAddress; + + public static String INSTANCE_ID = getIp() + ":" + getPid(); /** * 获取本机IP地址. @@ -90,17 +88,13 @@ private static boolean isV6IpAddress(final InetAddress ipAddress) { return ipAddress.getHostAddress().contains(":"); } - /** - * 获取本机Host名称. - * - * @return 本机Host名称 - */ - public static String getHostName() { + public static int getPid() { + RuntimeMXBean runtime = ManagementFactory.getRuntimeMXBean(); + String name = runtime.getName(); // format: "pid@hostname" try { - return InetAddress.getLocalHost().getHostName(); - } catch (final UnknownHostException ex) { - return null; + return Integer.parseInt(name.substring(0, name.indexOf('@'))); + } catch (Exception e) { + return -1; } } - } \ No newline at end of file diff --git a/client-parent/pom.xml b/client-parent/pom.xml index 0e21e9c..e1f2a1e 100644 --- a/client-parent/pom.xml +++ b/client-parent/pom.xml @@ -5,7 +5,7 @@ 4.0.0 com.hotcaffeine client-parent - 1.0 + 1.1 pom diff --git a/dashboard/src/main/java/com/hotcaffeine/dashboard/etcd/DashboardEtcdClient.java b/dashboard/src/main/java/com/hotcaffeine/dashboard/etcd/DashboardEtcdClient.java index f9cca0b..750de4b 100644 --- a/dashboard/src/main/java/com/hotcaffeine/dashboard/etcd/DashboardEtcdClient.java +++ b/dashboard/src/main/java/com/hotcaffeine/dashboard/etcd/DashboardEtcdClient.java @@ -152,8 +152,7 @@ private void deleteSelfInfo() { } private String buildKey() { - String hostName = IpUtil.getHostName(); - return etcdConfig.getDashboardPath() + hostName; + return etcdConfig.getDashboardPath() + IpUtil.INSTANCE_ID; } private String buildValue() { diff --git a/pom.xml b/pom.xml index c587401..7f87bbe 100644 --- a/pom.xml +++ b/pom.xml @@ -16,7 +16,7 @@ - 1.0 + 1.1 1.8 UTF-8 1.1.3 diff --git a/worker/src/main/java/com/hotcaffeine/worker/consumer/NewKeyConsumer.java b/worker/src/main/java/com/hotcaffeine/worker/consumer/NewKeyConsumer.java index efc5df5..17052c6 100644 --- a/worker/src/main/java/com/hotcaffeine/worker/consumer/NewKeyConsumer.java +++ b/worker/src/main/java/com/hotcaffeine/worker/consumer/NewKeyConsumer.java @@ -102,7 +102,7 @@ public void consume(KeyCount keyCount) { caffeineCache.delete(key); // 开启推送 keyCount.setCreateTime(System.currentTimeMillis()); - logger.info("appName:{} key:{} hot:{} rule:{}", keyCount.getAppName(), key, count, keyRule.getKey()); + logger.info("appName:{} key:{} inner:{} rule:{} hot:{}", keyCount.getAppName(), key, keyCount.isInner(), keyRule.getKey(), count); // 分别推送到各client和dashboard MetricsUtil.incrSendKeys(); for (IPusher pusher : iPushers) { diff --git a/worker/src/main/java/com/hotcaffeine/worker/etcd/WorkerEtcdClient.java b/worker/src/main/java/com/hotcaffeine/worker/etcd/WorkerEtcdClient.java index 6132b00..2de6a61 100644 --- a/worker/src/main/java/com/hotcaffeine/worker/etcd/WorkerEtcdClient.java +++ b/worker/src/main/java/com/hotcaffeine/worker/etcd/WorkerEtcdClient.java @@ -202,7 +202,7 @@ private String buildKey() { if (!etcdConfig.isDefaultWorker()) { workerPath = etcdConfig.getWorkerForAppPath(); } - return workerPath + "/" + IpUtil.getHostName(); + return workerPath + "/" + IpUtil.INSTANCE_ID; } private String getIp() { diff --git a/worker/src/main/java/com/hotcaffeine/worker/netty/processor/NewKeyProcessor.java b/worker/src/main/java/com/hotcaffeine/worker/netty/processor/NewKeyProcessor.java index e386b7d..5166bb1 100644 --- a/worker/src/main/java/com/hotcaffeine/worker/netty/processor/NewKeyProcessor.java +++ b/worker/src/main/java/com/hotcaffeine/worker/netty/processor/NewKeyProcessor.java @@ -50,6 +50,9 @@ public void process(Message message, Channel channel) { MetricsUtil.incrReceiveKeys(models.size()); for (KeyCount keyCount : models) { keyCount.setAppName(message.getAppName()); + if (keyCount.isInner()) { + keyCount.setChannel(channel); + } if (!memoryMQGroup.offer(keyCount)) { logger.warn("offer:{} failed, maybe full", keyCount); } diff --git a/worker/src/main/java/com/hotcaffeine/worker/pusher/AppServerPusher.java b/worker/src/main/java/com/hotcaffeine/worker/pusher/AppServerPusher.java index a1e3573..fdf8e65 100644 --- a/worker/src/main/java/com/hotcaffeine/worker/pusher/AppServerPusher.java +++ b/worker/src/main/java/com/hotcaffeine/worker/pusher/AppServerPusher.java @@ -33,6 +33,13 @@ public class AppServerPusher implements IPusher { */ @Override public void push(KeyCount keyCount) { + // 内部消息处理 + if (keyCount.isInner()) { + keyCount.getChannel().writeAndFlush( + NettyUtil.buildByteBuf(new Message(MessageType.RESPONSE_NEW_KEY, JsonUtil.toJSON(keyCount)))); + return; + } + // 正常消息处理 ChannelGroup channelGroup = clientChannelProcessor.getChannelGroup(keyCount.getAppName()); if (channelGroup == null) { logger.error("app:{} no channel", keyCount.getAppName()); diff --git a/worker/src/main/java/com/hotcaffeine/worker/pusher/DashboardPusher.java b/worker/src/main/java/com/hotcaffeine/worker/pusher/DashboardPusher.java index f95b25b..423dfc8 100644 --- a/worker/src/main/java/com/hotcaffeine/worker/pusher/DashboardPusher.java +++ b/worker/src/main/java/com/hotcaffeine/worker/pusher/DashboardPusher.java @@ -20,6 +20,9 @@ public class DashboardPusher implements IPusher { @Override public void push(KeyCount keyCount) { + if (keyCount.isInner()) { + return; + } dashboardMemoryMQ.offer(keyCount); } } diff --git a/worker/src/main/java/com/hotcaffeine/worker/task/TopHotKeyTask.java b/worker/src/main/java/com/hotcaffeine/worker/task/TopHotKeyTask.java index 4aae581..928b4a3 100644 --- a/worker/src/main/java/com/hotcaffeine/worker/task/TopHotKeyTask.java +++ b/worker/src/main/java/com/hotcaffeine/worker/task/TopHotKeyTask.java @@ -125,7 +125,7 @@ private void mergeAndPush(String appName, KeyRule keyRule, String timeKey) { if (!keyRule.isUseTopKAsHotKey()) { return; } - push(appName, keyRule, topHotKey.getHotKeyList()); + push(appName, topHotKey.getHotKeyList()); } catch (Exception e) { hotKeyLogger.error(e.getMessage(), e); } @@ -140,7 +140,7 @@ private void mergeAndPush(String appName, KeyRule keyRule, String timeKey) { * @param k * @param v */ - private void push(String appName, KeyRule keyRule, List hotKeyList) { + private void push(String appName, List hotKeyList) { KeyCount keyCount = new KeyCount(); keyCount.setAppName(appName); for (HotKey hotKey : hotKeyList) {