diff --git a/collector/src/main/java/com/zmops/open/collector/dispatch/CommonDispatcher.java b/collector/src/main/java/com/zmops/open/collector/dispatch/CommonDispatcher.java index c8d59f6..aaa4fc1 100644 --- a/collector/src/main/java/com/zmops/open/collector/dispatch/CommonDispatcher.java +++ b/collector/src/main/java/com/zmops/open/collector/dispatch/CommonDispatcher.java @@ -19,6 +19,7 @@ import com.google.gson.Gson; import com.google.gson.JsonElement; +import com.zmops.open.collector.dispatch.entrance.zabbix.agent.ZabbixAgentService; import com.zmops.open.collector.dispatch.timer.Timeout; import com.zmops.open.collector.dispatch.timer.TimerDispatch; import com.zmops.open.collector.dispatch.timer.WheelTimerTask; @@ -64,6 +65,9 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc * 触发子任务最大数量 */ private static final int MAX_SUB_TASK_NUM = 50; + + private static final long ITEM_ID_START = 10000L; + private static final long ITEM_ID_END = 99999L; private static final Gson GSON = new Gson(); /** * Priority queue of index group collection tasks @@ -80,6 +84,7 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc * 采集数据导出器 */ private CommonDataQueue commonDataQueue; + private ZabbixAgentService zabbixAgentService; /** * Metric group task and start time mapping map * 指标组任务与开始时间映射map @@ -91,11 +96,13 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc public CommonDispatcher(MetricsCollectorQueue jobRequestQueue, TimerDispatch timerDispatch, CommonDataQueue commonDataQueue, + ZabbixAgentService zabbixAgentService, WorkerPool workerPool, List unitConvertList) { this.commonDataQueue = commonDataQueue; this.jobRequestQueue = jobRequestQueue; this.timerDispatch = timerDispatch; + this.zabbixAgentService = zabbixAgentService; this.unitConvertList = unitConvertList; ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 2, 1, TimeUnit.SECONDS, @@ -206,7 +213,12 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met if (job.isCyclic()) { // If it is an asynchronous periodic cyclic task, directly send the collected data of the indicator group to the message middleware // 若是异步的周期性循环任务,直接发送指标组的采集数据到消息中间件 - commonDataQueue.sendMetricsData(metricsData); + if (job.getMonitorId() <= ITEM_ID_END && job.getMonitorId() >= ITEM_ID_START) { + // from zabbix send data to zabbix + zabbixAgentService.sendMetricsData(metricsData); + } else { + commonDataQueue.sendMetricsData(metricsData); + } if (log.isDebugEnabled()) { log.debug("Cyclic Job: {}",metricsData.getMetrics()); for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) { diff --git a/collector/src/main/java/com/zmops/open/collector/dispatch/DispatchProperties.java b/collector/src/main/java/com/zmops/open/collector/dispatch/DispatchProperties.java index 245c08e..ad0ff43 100644 --- a/collector/src/main/java/com/zmops/open/collector/dispatch/DispatchProperties.java +++ b/collector/src/main/java/com/zmops/open/collector/dispatch/DispatchProperties.java @@ -25,7 +25,7 @@ * 调度分发任务配置属性 * * @author tomsun28 from hertzbeat - * @date 2021/10/16 14:54 + * @date 2023/03/06 13:17 */ @Component @ConfigurationProperties(prefix = "collector.dispatch") @@ -72,6 +72,11 @@ public static class EntranceProperties { */ private EtcdProperties etcd; + /** + * zabbix properties + */ + private ZabbixProperties zabbix; + public EtcdProperties getEtcd() { return etcd; } @@ -80,13 +85,21 @@ public void setEtcd(EtcdProperties etcd) { this.etcd = etcd; } + public ZabbixProperties getZabbix() { + return zabbix; + } + + public void setZabbix(ZabbixProperties zabbix) { + this.zabbix = zabbix; + } + public static class EtcdProperties { /** * Whether etcd scheduling is started * etcd调度是否启动 */ - private boolean enabled = true; + private boolean enabled = false; /** * etcd's connection endpoint url @@ -194,6 +207,61 @@ public void setJobDir(String jobDir) { this.jobDir = jobDir; } } + + public static class ZabbixProperties { + /** + * Whether zabbix scheduling is started + * zabbix调度是否启动 + */ + private boolean enabled = false; + + /** + * zabbix server host + */ + private String host; + + /** + * zabbix server port + */ + private Integer port = 10051; + + /** + * zabbix agent host name + */ + private String agentHost = "ArgusDBM"; + + public boolean isEnabled() { + return enabled; + } + + public void setEnabled(boolean enabled) { + this.enabled = enabled; + } + + public String getHost() { + return host; + } + + public void setHost(String host) { + this.host = host; + } + + public Integer getPort() { + return port; + } + + public void setPort(Integer port) { + this.port = port; + } + + public String getAgentHost() { + return agentHost; + } + + public void setAgentHost(String agentHost) { + this.agentHost = agentHost; + } + } } /** diff --git a/collector/src/main/java/com/zmops/open/collector/dispatch/MetricsCollect.java b/collector/src/main/java/com/zmops/open/collector/dispatch/MetricsCollect.java index afe89ae..5247857 100644 --- a/collector/src/main/java/com/zmops/open/collector/dispatch/MetricsCollect.java +++ b/collector/src/main/java/com/zmops/open/collector/dispatch/MetricsCollect.java @@ -47,7 +47,7 @@ * 指标组采集 * * @author tomsun28 from hertzbeat - * @date 2021/10/10 15:35 + * @date 2023/03/06 13:17 */ @Slf4j @Data diff --git a/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/agent/TcpClientHandler.java b/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/agent/TcpClientHandler.java index b160fdb..4295dc2 100644 --- a/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/agent/TcpClientHandler.java +++ b/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/agent/TcpClientHandler.java @@ -1,18 +1,85 @@ package com.zmops.open.collector.dispatch.entrance.zabbix.agent; +import com.zmops.open.collector.dispatch.entrance.internal.CollectJobService; +import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixProtocolType; import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixResponse; +import com.zmops.open.common.entity.job.Configmap; +import com.zmops.open.common.entity.job.Job; +import com.zmops.open.common.entity.job.Metrics; +import com.zmops.open.common.service.AppDefineHouse; +import com.zmops.open.common.support.SpringContextHolder; +import com.zmops.open.common.util.GsonUtil; +import com.zmops.open.common.util.SnowFlakeIdGenerator; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; +import lombok.extern.slf4j.Slf4j; + +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; /** * @author nantian Zabbix protocol type */ +@Slf4j public class TcpClientHandler extends SimpleChannelInboundHandler { + private CollectJobService collectJobService; + private Set runningJobs; + + public TcpClientHandler() { + this.collectJobService = SpringContextHolder.getBean(CollectJobService.class); + this.runningJobs = new HashSet<>(8); + } @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, ZabbixResponse response) throws Exception { - + if (response != null && response.getType() == ZabbixProtocolType.ACTIVE_CHECKS + && response.getActiveChecks() != null) { + runningJobs.forEach(jobId -> { + collectJobService.cancelAsyncCollectJob(jobId); + }); + runningJobs.clear(); + for (ZabbixResponse.ActiveChecks metric : response.getActiveChecks()) { + try { + Map paramsMap = metric.getParamsMap(); + if (paramsMap.isEmpty()) { + continue; + } + // 构造采集任务Job实体 + Job appDefine = AppDefineHouse.getAppDefine(paramsMap.get(ZabbixResponse.APP)); + appDefine = GsonUtil.fromJson(GsonUtil.toJson(appDefine), Job.class); + long jobId = SnowFlakeIdGenerator.generateId(); + appDefine.setId(jobId); + // set itemId 10000 - 99999 in monitorId + appDefine.setMonitorId(metric.getItemid()); + appDefine.setInterval(30); + appDefine.setCyclic(true); + appDefine.setTimestamp(System.currentTimeMillis()); + List configmaps = appDefine.getConfigmap().stream().peek(config -> { + String value = paramsMap.get(config.getKey()); + config.setValue(value); + }).collect(Collectors.toList()); + appDefine.setConfigmap(configmaps); + // filter metric + List metrics = appDefine.getMetrics().stream().filter(item -> + item.getName().equals(paramsMap.get(ZabbixResponse.METRICS))) + .peek(item -> item.setPriority((byte) 0)) + .collect(Collectors.toList()); + if (metrics.isEmpty()) { + continue; + } + appDefine.setMetrics(metrics); + // 下发采集任务 + collectJobService.addAsyncCollectJob(appDefine); + runningJobs.add(jobId); + } catch (Exception e) { + log.error("add zabbix monitor job error {}", e.getMessage(), e); + } + } + } } @Override diff --git a/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/agent/ZabbixAgentClient.java b/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/agent/ZabbixAgentClient.java deleted file mode 100644 index 7798ecb..0000000 --- a/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/agent/ZabbixAgentClient.java +++ /dev/null @@ -1,61 +0,0 @@ -package com.zmops.open.collector.dispatch.entrance.zabbix.agent; - - -import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixProtocolType; -import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixRequest; -import io.netty.channel.Channel; - -import java.util.Collections; -import java.util.Random; -import java.util.UUID; - -/** - * @author nantian Zabbix protocol type - */ -public class ZabbixAgentClient { - - public static void main(String[] args) throws Exception { - - TcpClient zabbixAgent = new TcpClient("172.16.3.77", 10051); - zabbixAgent.start(); - - Channel channel = zabbixAgent.getChannel(); - - - ZabbixRequest request = new ZabbixRequest(); - request.setType(ZabbixProtocolType.ACTIVE_CHECKS); - request.setHost("ZabbixAgentTest"); - - channel.writeAndFlush(request); - zabbixAgent.shutdown(); - - Thread.sleep(2000); - - // Zabbix Server 会主动断开,每次提交数据都需要重新创建连接 - //========================================================== - - TcpClient zabbixAgent2 = new TcpClient("172.16.3.77", 10051); - zabbixAgent2.start(); - - Channel channel2 = zabbixAgent2.getChannel(); - - ZabbixRequest requestData = new ZabbixRequest(); - requestData.setType(ZabbixProtocolType.AGENT_DATA); - requestData.setHost("ZabbixAgentTest"); - // agent 启动时生成, 32位 - requestData.setSession(UUID.randomUUID().toString().replace("-", "")); - - Random random = new Random(); - - ZabbixRequest.AgentData data = new ZabbixRequest.AgentData(); - data.setItemid(43488); - data.setValue("" + random.nextInt(1000)); - data.setClock(System.currentTimeMillis() / 1000); - data.setNs(76808644); - - requestData.setAgentDataList(Collections.singletonList(data)); - - channel2.writeAndFlush(requestData); - zabbixAgent2.shutdown(); - } -} diff --git a/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/agent/ZabbixAgentService.java b/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/agent/ZabbixAgentService.java new file mode 100644 index 0000000..655de29 --- /dev/null +++ b/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/agent/ZabbixAgentService.java @@ -0,0 +1,146 @@ +package com.zmops.open.collector.dispatch.entrance.zabbix.agent; + + +import com.zmops.open.collector.dispatch.DispatchProperties; +import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixProtocolType; +import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixRequest; +import com.zmops.open.common.entity.message.CollectRep; +import com.zmops.open.common.util.CommonConstants; +import com.zmops.open.common.util.GsonUtil; +import io.netty.channel.Channel; +import lombok.extern.slf4j.Slf4j; +import org.springframework.boot.CommandLineRunner; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.stereotype.Component; + +import java.util.*; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +/** + * @author nantian Zabbix protocol type + */ +@Component +@ConditionalOnProperty(prefix = "collector.dispatch.entrance.zabbix", + name = "enabled", havingValue = "true") +@Slf4j +public class ZabbixAgentService implements CommandLineRunner { + + private final LinkedBlockingQueue metricsDataQueue; + private TcpClient zabbixAgent; + private String sessionId; + + private DispatchProperties.EntranceProperties.ZabbixProperties zabbixProperties; + + public ZabbixAgentService(DispatchProperties dispatchProperties) { + this.metricsDataQueue = new LinkedBlockingQueue<>(); + this.zabbixProperties = dispatchProperties.getEntrance().getZabbix(); + } + + @Override + public void run(String... args) throws Exception { + zabbixAgent = new TcpClient(zabbixProperties.getHost(), zabbixProperties.getPort()); + zabbixAgent.start(); + sessionId = UUID.randomUUID().toString().replace("-", ""); + Channel channel = zabbixAgent.getChannel(); + ZabbixRequest request = new ZabbixRequest(); + request.setType(ZabbixProtocolType.ACTIVE_CHECKS); + request.setHost(zabbixProperties.getAgentHost()); + channel.writeAndFlush(request); + Thread.sleep(2000); + + Runnable runnable = () -> { + while (!Thread.currentThread().isInterrupted()) { + try { + CollectRep.MetricsData metricsData = metricsDataQueue.poll(2, TimeUnit.SECONDS); + if (metricsData != null && metricsData.getCode() == CollectRep.Code.SUCCESS) { + Channel dataChannel = zabbixAgent.getChannel(); + if (dataChannel == null || !dataChannel.isActive()) { + zabbixAgent = new TcpClient(zabbixProperties.getHost(), zabbixProperties.getPort()); + zabbixAgent.start(); + sessionId = UUID.randomUUID().toString().replace("-", ""); + dataChannel = zabbixAgent.getChannel(); + } + ZabbixRequest requestData = new ZabbixRequest(); + requestData.setType(ZabbixProtocolType.AGENT_DATA); + requestData.setHost(zabbixProperties.getAgentHost()); + // agent 启动时生成, 32位 + requestData.setSession(sessionId); + ZabbixRequest.AgentData data = new ZabbixRequest.AgentData(); + data.setItemid((int) metricsData.getId()); + List> dataList = new LinkedList<>(); + metricsData.getValuesList().forEach(values -> { + Map dataMap = new HashMap<>(8); + for (int i = 0; i < metricsData.getFieldsCount(); i++) { + CollectRep.Field field = metricsData.getFields(i); + String value = values.getColumns(i); + if (!CommonConstants.NULL_VALUE.equals(value)) { + dataMap.put(field.getName(), value); + } + } + dataList.add(dataMap); + }); + String dataJson = GsonUtil.toJson(dataList); + data.setValue(dataJson); + data.setClock(System.currentTimeMillis() / 1000); + // todo 采集消费时间 + data.setNs(76808644); + requestData.setAgentDataList(Collections.singletonList(data)); + dataChannel.writeAndFlush(requestData); + } + } catch (Exception e) {} + } + }; + Executors.newSingleThreadExecutor().submit(runnable); + } + + public void sendMetricsData(CollectRep.MetricsData metricsData) { + this.metricsDataQueue.offer(metricsData); + } + + public static void main(String[] args) throws Exception { + + TcpClient zabbixAgent = new TcpClient("localhost", 10051); + zabbixAgent.start(); + + Channel channel = zabbixAgent.getChannel(); + + + ZabbixRequest request = new ZabbixRequest(); + request.setType(ZabbixProtocolType.ACTIVE_CHECKS); + request.setHost("ZabbixAgentTest"); + + channel.writeAndFlush(request); + zabbixAgent.shutdown(); + + Thread.sleep(2000); + + // Zabbix Server 会主动断开,每次提交数据都需要重新创建连接 + //========================================================== + + TcpClient zabbixAgent2 = new TcpClient("localhost", 10051); + zabbixAgent2.start(); + + Channel channel2 = zabbixAgent2.getChannel(); + + ZabbixRequest requestData = new ZabbixRequest(); + requestData.setType(ZabbixProtocolType.AGENT_DATA); + requestData.setHost("ZabbixAgentTest"); + // agent 启动时生成, 32位 + requestData.setSession(UUID.randomUUID().toString().replace("-", "")); + + Random random = new Random(); + + ZabbixRequest.AgentData data = new ZabbixRequest.AgentData(); + data.setItemid(44301); + data.setValue("" + random.nextInt(1000)); + data.setClock(System.currentTimeMillis() / 1000); + data.setNs(76808644); + + requestData.setAgentDataList(Collections.singletonList(data)); + + channel2.writeAndFlush(requestData); + zabbixAgent2.shutdown(); + } +} diff --git a/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/protocol/ZabbixProtocolDecoder.java b/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/protocol/ZabbixProtocolDecoder.java index 0d532d7..6ec4716 100644 --- a/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/protocol/ZabbixProtocolDecoder.java +++ b/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/protocol/ZabbixProtocolDecoder.java @@ -51,8 +51,6 @@ protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteB return; } - System.out.println("ZabbixServer 响应数据为:" + payload); - // Parse content and add to list ZabbixResponse response = requestParser.fromJson(payload, ZabbixResponse.class); list.add(response); diff --git a/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/protocol/bean/ZabbixResponse.java b/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/protocol/bean/ZabbixResponse.java index 6204c0b..db2408d 100644 --- a/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/protocol/bean/ZabbixResponse.java +++ b/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/protocol/bean/ZabbixResponse.java @@ -18,18 +18,32 @@ package com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean; -import lombok.Builder; -import lombok.Data; -import lombok.Getter; +import lombok.*; +import lombok.extern.slf4j.Slf4j; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * @author nantian Zabbix protocol type */ @Data +@AllArgsConstructor +@NoArgsConstructor +@Slf4j public class ZabbixResponse { + public static final String APP = "app"; + public static final String METRICS = "metrics"; + public static final String HOST = "host"; + public static final String PORT = "port"; + public static final String TIMEOUT = "timeout"; + public static final String DATABASE = "database"; + public static final String USERNAME = "username"; + public static final String PASSWORD = "password"; + public static final String URL = "url"; + /** * Protocol type */ @@ -40,12 +54,47 @@ public class ZabbixResponse { private AgentData agentData; @Data + @AllArgsConstructor + @NoArgsConstructor @Builder public static class ActiveChecks { private String key; - private int delay; + private Long itemid; + private String delay; private int mtime; private int lastlogsize; + + public Map getParamsMap() { + Map paramsMap = new HashMap<>(8); + if (key != null && key.length() > 0 && key.contains("[") && key.contains("]")) { + try { + int startIndex = key.indexOf('['); + int endIndex = key.indexOf(']'); + String preStr = key.substring(0, startIndex); + String endStr = key.substring(startIndex + 1, endIndex); + String[] params = endStr.split(","); + String[] keys = preStr.split("\\."); + if (keys.length != 2 || params.length != 7) { + log.error("zabbix metric key {} do not meet the requirements. ", key); + } else { + paramsMap.put(APP, keys[0]); + paramsMap.put(METRICS, keys[1]); + paramsMap.put(HOST, params[0]); + paramsMap.put(PORT, params[1]); + paramsMap.put(TIMEOUT, params[2]); + paramsMap.put(DATABASE, params[3]); + paramsMap.put(USERNAME, params[4]); + paramsMap.put(PASSWORD, params[5]); + paramsMap.put(URL, params[6]); + } + } catch (Exception e) { + log.error("zabbix metric key {} do not meet the requirements. ", key); + } + } else { + log.error("zabbix metric key {} do not meet the requirements. ", key); + } + return paramsMap; + } } @Getter diff --git a/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/protocol/bean/ZabbixResponseJsonDeserializer.java b/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/protocol/bean/ZabbixResponseJsonDeserializer.java index ab3888a..eec4481 100644 --- a/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/protocol/bean/ZabbixResponseJsonDeserializer.java +++ b/collector/src/main/java/com/zmops/open/collector/dispatch/entrance/zabbix/protocol/bean/ZabbixResponseJsonDeserializer.java @@ -18,12 +18,12 @@ package com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean; -import com.google.gson.JsonDeserializationContext; -import com.google.gson.JsonDeserializer; -import com.google.gson.JsonElement; -import com.google.gson.JsonParseException; +import com.google.gson.*; +import com.google.gson.reflect.TypeToken; import java.lang.reflect.Type; +import java.util.List; +import java.util.Map; /** * Deserializer Zabbix Response to JSON @@ -35,9 +35,15 @@ public class ZabbixResponseJsonDeserializer implements JsonDeserializer>(){}.getType(); + List activeChecks = new Gson().fromJson(jsonElement, checksType); + response.setActiveChecks(activeChecks); + } return response; } } diff --git a/collector/src/main/java/com/zmops/open/collector/dispatch/timer/WheelTimerTask.java b/collector/src/main/java/com/zmops/open/collector/dispatch/timer/WheelTimerTask.java index c971e0b..7eb39e0 100644 --- a/collector/src/main/java/com/zmops/open/collector/dispatch/timer/WheelTimerTask.java +++ b/collector/src/main/java/com/zmops/open/collector/dispatch/timer/WheelTimerTask.java @@ -72,8 +72,10 @@ private void initJobMetrics(Job job) { String decodeValue = AesUtil.aesDecode(String.valueOf(item.getValue())); if (decodeValue == null) { log.error("Aes Decode value {} error.", item.getValue()); + item.setValue(item.getValue()); + } else { + item.setValue(decodeValue); } - item.setValue(decodeValue); } else if (item.getValue() != null && item.getValue() instanceof String) { item.setValue(((String) item.getValue()).trim()); } diff --git a/common/src/main/java/com/zmops/open/common/service/AppDefineHouse.java b/common/src/main/java/com/zmops/open/common/service/AppDefineHouse.java new file mode 100644 index 0000000..19fb447 --- /dev/null +++ b/common/src/main/java/com/zmops/open/common/service/AppDefineHouse.java @@ -0,0 +1,34 @@ +package com.zmops.open.common.service; + +import com.zmops.open.common.entity.job.Job; +import com.zmops.open.common.entity.manager.ParamDefine; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * @author tom + * @date 2023/3/2 15:36 + */ +public class AppDefineHouse { + + private static final Map appDefines = new ConcurrentHashMap<>(); + private static final Map> paramDefines = new ConcurrentHashMap<>(); + + public static void flushDefine(Map defines, Map> params) { + appDefines.clear(); + appDefines.putAll(defines); + paramDefines.clear(); + paramDefines.putAll(params); + } + + public static Job getAppDefine(String app) { + return appDefines.get(app); + } + + public static List getParamDefines(String app) { + return paramDefines.get(app); + } + +} diff --git a/common/src/main/java/com/zmops/open/common/util/AesUtil.java b/common/src/main/java/com/zmops/open/common/util/AesUtil.java index 80e3fba..ba5328c 100644 --- a/common/src/main/java/com/zmops/open/common/util/AesUtil.java +++ b/common/src/main/java/com/zmops/open/common/util/AesUtil.java @@ -114,13 +114,13 @@ public static String aesDecode(String content, String decryptKey) { byte[] byteDecode = cipher.doFinal(bytesContent); return new String(byteDecode, StandardCharsets.UTF_8); } catch (NoSuchAlgorithmException e) { - log.error("没有指定的加密算法::{}", e.getMessage(),e); + log.error("没有指定的加密算法::{}", e.getMessage()); } catch (IllegalBlockSizeException e) { - log.error("非法的块大小::{}", e.getMessage(),e); + log.warn("非法的块大小::{}", e.getMessage()); } catch (NullPointerException e) { - log.error("秘钥解析空指针异常::{}", e.getMessage(),e); + log.error("秘钥解析空指针异常::{}", e.getMessage()); } catch (Exception e) { - log.error("秘钥AES解析出现未知错误::{}", e.getMessage(),e); + log.error("秘钥AES解析出现未知错误::{}", e.getMessage()); } return content; } diff --git a/common/src/main/java/com/zmops/open/common/util/TimePeriodUtil.java b/common/src/main/java/com/zmops/open/common/util/TimePeriodUtil.java new file mode 100644 index 0000000..562efe2 --- /dev/null +++ b/common/src/main/java/com/zmops/open/common/util/TimePeriodUtil.java @@ -0,0 +1,27 @@ +package com.zmops.open.common.util; + +import java.time.Duration; +import java.time.Period; +import java.time.temporal.TemporalAmount; + +/** + * time util + * + * @author tom + * @date 2023/3/5 16:45 + */ +public class TimePeriodUtil { + + /** + * parse tokenTime to TemporalAmount + * @param tokenTime eg: "1m", "5M", "3D", "30m", "2h", "1Y", "3W" + * @return TemporalAmount + */ + public static TemporalAmount parseTokenTime(String tokenTime) { + if (Character.isUpperCase(tokenTime.charAt(tokenTime.length() - 1))) { + return Period.parse("P" + tokenTime); + } else { + return Duration.parse("PT" + tokenTime); + } + } +} diff --git a/manager/src/main/java/com/zmops/open/manager/Manager.java b/manager/src/main/java/com/zmops/open/manager/Manager.java index 049c738..4390136 100644 --- a/manager/src/main/java/com/zmops/open/manager/Manager.java +++ b/manager/src/main/java/com/zmops/open/manager/Manager.java @@ -22,6 +22,7 @@ import org.springframework.boot.autoconfigure.domain.EntityScan; import org.springframework.data.jpa.repository.config.EnableJpaAuditing; import org.springframework.data.jpa.repository.config.EnableJpaRepositories; +import org.springframework.scheduling.annotation.EnableScheduling; /** * @author tomsun28 from hertzbeat @@ -30,6 +31,7 @@ @SpringBootApplication @EnableJpaAuditing +@EnableScheduling @EnableJpaRepositories(basePackages = {"com.zmops.open"}) @EntityScan(basePackages = {"com.zmops.open"}) public class Manager { diff --git a/manager/src/main/java/com/zmops/open/manager/service/impl/AppServiceImpl.java b/manager/src/main/java/com/zmops/open/manager/service/impl/AppServiceImpl.java index e5486ac..fea10c8 100644 --- a/manager/src/main/java/com/zmops/open/manager/service/impl/AppServiceImpl.java +++ b/manager/src/main/java/com/zmops/open/manager/service/impl/AppServiceImpl.java @@ -19,6 +19,7 @@ import com.zmops.open.common.entity.job.Job; import com.zmops.open.common.entity.job.Metrics; +import com.zmops.open.common.service.AppDefineHouse; import com.zmops.open.manager.dao.ParamDefineDao; import com.zmops.open.manager.pojo.dto.Hierarchy; import com.zmops.open.manager.pojo.dto.ParamDefineDto; @@ -262,5 +263,8 @@ public void run(String... args) throws Exception { throw e; } } + + // store common + AppDefineHouse.flushDefine(appDefines, paramDefines); } } diff --git a/manager/src/main/resources/application.yml b/manager/src/main/resources/application.yml index fc14643..1886609 100644 --- a/manager/src/main/resources/application.yml +++ b/manager/src/main/resources/application.yml @@ -79,6 +79,7 @@ spring: jpa: hibernate: ddl-auto: update + show-sql: on # Not Require, Please config if you need email notify # 非必填:不使用邮箱作为警告通知可以去掉spring.mail配置 @@ -104,6 +105,7 @@ warehouse: store: inner: enabled: true + expire-time: 14D td-engine: enabled: false driver-class-name: com.taosdata.jdbc.rs.RestfulDriver @@ -131,3 +133,12 @@ warehouse: host: 127.0.0.1 port: 6379 password: 123456 + +collector: + dispatch: + entrance: + zabbix: + enabled: true + host: localhost + port: 10051 + agent-host: ZabbixAgentTest diff --git a/warehouse/src/main/java/com/zmops/open/warehouse/config/WarehouseProperties.java b/warehouse/src/main/java/com/zmops/open/warehouse/config/WarehouseProperties.java index d674bee..a235697 100644 --- a/warehouse/src/main/java/com/zmops/open/warehouse/config/WarehouseProperties.java +++ b/warehouse/src/main/java/com/zmops/open/warehouse/config/WarehouseProperties.java @@ -242,6 +242,11 @@ public static class InnerProperties { */ private boolean enabled = true; + /** + * save data expire time(ms) + */ + private String expireTime = "7D"; + public boolean isEnabled() { return enabled; } @@ -250,6 +255,13 @@ public void setEnabled(boolean enabled) { this.enabled = enabled; } + public String getExpireTime() { + return expireTime; + } + + public void setExpireTime(String expireTime) { + this.expireTime = expireTime; + } } public static class InfluxdbProperties { diff --git a/warehouse/src/main/java/com/zmops/open/warehouse/dao/HistoryDao.java b/warehouse/src/main/java/com/zmops/open/warehouse/dao/HistoryDao.java index 47d84c5..144219b 100644 --- a/warehouse/src/main/java/com/zmops/open/warehouse/dao/HistoryDao.java +++ b/warehouse/src/main/java/com/zmops/open/warehouse/dao/HistoryDao.java @@ -11,4 +11,9 @@ */ public interface HistoryDao extends JpaRepository, JpaSpecificationExecutor { + /** + * delete history before expireTime + * @param expireTime expireTime + */ + void deleteHistoriesByTimeBefore(Long expireTime); } diff --git a/warehouse/src/main/java/com/zmops/open/warehouse/store/HistoryInnerDataStorage.java b/warehouse/src/main/java/com/zmops/open/warehouse/store/HistoryInnerDataStorage.java index 8a7abd3..d45933f 100644 --- a/warehouse/src/main/java/com/zmops/open/warehouse/store/HistoryInnerDataStorage.java +++ b/warehouse/src/main/java/com/zmops/open/warehouse/store/HistoryInnerDataStorage.java @@ -6,19 +6,25 @@ import com.zmops.open.common.entity.warehouse.History; import com.zmops.open.common.queue.CommonDataQueue; import com.zmops.open.common.util.CommonConstants; +import com.zmops.open.common.util.TimePeriodUtil; import com.zmops.open.warehouse.WarehouseWorkerPool; import com.zmops.open.warehouse.config.WarehouseProperties; import com.zmops.open.warehouse.dao.HistoryDao; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.math.NumberUtils; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.data.domain.Sort; import org.springframework.data.jpa.domain.Specification; +import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; import javax.persistence.criteria.CriteriaBuilder; import javax.persistence.criteria.Predicate; import java.math.BigDecimal; import java.math.RoundingMode; +import java.time.Duration; +import java.time.ZonedDateTime; +import java.time.temporal.TemporalAmount; import java.util.*; /** @@ -34,17 +40,44 @@ public class HistoryInnerDataStorage extends AbstractHistoryDataStorage { private HistoryDao historyDao; + private WarehouseProperties.StoreProperties.InnerProperties innerProperties; + public HistoryInnerDataStorage(WarehouseWorkerPool workerPool, WarehouseProperties properties, HistoryDao historyDao, CommonDataQueue commonDataQueue) { super(workerPool, properties, commonDataQueue); - + this.innerProperties = properties.getStore().getInner(); this.serverAvailable = true; this.historyDao = historyDao; this.startStorageData("warehouse-inner-data-storage", isServerAvailable()); } + @Scheduled(cron = "0 0 23 * * ?") + public void expiredDataCleaner() { + String expireTimeStr = innerProperties.getExpireTime(); + long expireTime = 0; + try { + if (NumberUtils.isParsable(expireTimeStr)) { + expireTime = NumberUtils.toLong(expireTimeStr); + expireTime = (ZonedDateTime.now().toEpochSecond() + expireTime) * 1000; + } else { + TemporalAmount temporalAmount = TimePeriodUtil.parseTokenTime(expireTimeStr); + ZonedDateTime dateTime = ZonedDateTime.now().minus(temporalAmount); + expireTime = dateTime.toEpochSecond() * 1000; + } + } catch (Exception e) { + log.error("expiredDataCleaner time error: {}. use default expire time to clean: 7d", e.getMessage()); + ZonedDateTime dateTime = ZonedDateTime.now().minus(Duration.ofDays(7)); + expireTime = dateTime.toEpochSecond() * 1000; + } + try { + historyDao.deleteHistoriesByTimeBefore(expireTime); + } catch (Exception e) { + log.error("expiredDataCleaner database error: {}.", e.getMessage()); + } + } + @Override void saveData(CollectRep.MetricsData metricsData) { if (metricsData.getCode() != CollectRep.Code.SUCCESS) { @@ -127,6 +160,17 @@ public Map> getHistoryMetricData(Long monitorId, String app, Predicate predicateMonitorInstance = criteriaBuilder.equal(root.get("instance"), instance); andList.add(predicateMonitorInstance); } + if (history != null) { + try { + TemporalAmount temporalAmount = Duration.parse("PT" + history); + ZonedDateTime dateTime = ZonedDateTime.now().minus(temporalAmount); + long timeBefore = dateTime.toEpochSecond() * 1000; + Predicate timePredicate = criteriaBuilder.ge(root.get("time"), timeBefore); + andList.add(timePredicate); + } catch (Exception e) { + log.error(e.getMessage()); + } + } Predicate[] predicates = new Predicate[andList.size()]; Predicate predicate = criteriaBuilder.and(andList.toArray(predicates)); return query.where(predicate).getRestriction();