Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support zabbix agent #32

Merged
merged 4 commits into from
Mar 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import com.google.gson.Gson;
import com.google.gson.JsonElement;
import com.zmops.open.collector.dispatch.entrance.zabbix.agent.ZabbixAgentService;
import com.zmops.open.collector.dispatch.timer.Timeout;
import com.zmops.open.collector.dispatch.timer.TimerDispatch;
import com.zmops.open.collector.dispatch.timer.WheelTimerTask;
Expand Down Expand Up @@ -64,6 +65,9 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
* 触发子任务最大数量
*/
private static final int MAX_SUB_TASK_NUM = 50;

private static final long ITEM_ID_START = 10000L;
private static final long ITEM_ID_END = 99999L;
private static final Gson GSON = new Gson();
/**
* Priority queue of index group collection tasks
Expand All @@ -80,6 +84,7 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
* 采集数据导出器
*/
private CommonDataQueue commonDataQueue;
private ZabbixAgentService zabbixAgentService;
/**
* Metric group task and start time mapping map
* 指标组任务与开始时间映射map
Expand All @@ -91,11 +96,13 @@ public class CommonDispatcher implements MetricsTaskDispatch, CollectDataDispatc
public CommonDispatcher(MetricsCollectorQueue jobRequestQueue,
TimerDispatch timerDispatch,
CommonDataQueue commonDataQueue,
ZabbixAgentService zabbixAgentService,
WorkerPool workerPool,
List<UnitConvert> unitConvertList) {
this.commonDataQueue = commonDataQueue;
this.jobRequestQueue = jobRequestQueue;
this.timerDispatch = timerDispatch;
this.zabbixAgentService = zabbixAgentService;
this.unitConvertList = unitConvertList;
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(2, 2, 1,
TimeUnit.SECONDS,
Expand Down Expand Up @@ -206,7 +213,12 @@ public void dispatchCollectData(Timeout timeout, Metrics metrics, CollectRep.Met
if (job.isCyclic()) {
// If it is an asynchronous periodic cyclic task, directly send the collected data of the indicator group to the message middleware
// 若是异步的周期性循环任务,直接发送指标组的采集数据到消息中间件
commonDataQueue.sendMetricsData(metricsData);
if (job.getMonitorId() <= ITEM_ID_END && job.getMonitorId() >= ITEM_ID_START) {
// from zabbix send data to zabbix
zabbixAgentService.sendMetricsData(metricsData);
} else {
commonDataQueue.sendMetricsData(metricsData);
}
if (log.isDebugEnabled()) {
log.debug("Cyclic Job: {}",metricsData.getMetrics());
for (CollectRep.ValueRow valueRow : metricsData.getValuesList()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
* 调度分发任务配置属性
*
* @author tomsun28 from <a href="https://github.com/dromara/hertzbeat">hertzbeat</a>
* @date 2021/10/16 14:54
* @date 2023/03/06 13:17
*/
@Component
@ConfigurationProperties(prefix = "collector.dispatch")
Expand Down Expand Up @@ -72,6 +72,11 @@ public static class EntranceProperties {
*/
private EtcdProperties etcd;

/**
* zabbix properties
*/
private ZabbixProperties zabbix;

public EtcdProperties getEtcd() {
return etcd;
}
Expand All @@ -80,13 +85,21 @@ public void setEtcd(EtcdProperties etcd) {
this.etcd = etcd;
}

public ZabbixProperties getZabbix() {
return zabbix;
}

public void setZabbix(ZabbixProperties zabbix) {
this.zabbix = zabbix;
}

public static class EtcdProperties {

/**
* Whether etcd scheduling is started
* etcd调度是否启动
*/
private boolean enabled = true;
private boolean enabled = false;

/**
* etcd's connection endpoint url
Expand Down Expand Up @@ -194,6 +207,61 @@ public void setJobDir(String jobDir) {
this.jobDir = jobDir;
}
}

public static class ZabbixProperties {
/**
* Whether zabbix scheduling is started
* zabbix调度是否启动
*/
private boolean enabled = false;

/**
* zabbix server host
*/
private String host;

/**
* zabbix server port
*/
private Integer port = 10051;

/**
* zabbix agent host name
*/
private String agentHost = "ArgusDBM";

public boolean isEnabled() {
return enabled;
}

public void setEnabled(boolean enabled) {
this.enabled = enabled;
}

public String getHost() {
return host;
}

public void setHost(String host) {
this.host = host;
}

public Integer getPort() {
return port;
}

public void setPort(Integer port) {
this.port = port;
}

public String getAgentHost() {
return agentHost;
}

public void setAgentHost(String agentHost) {
this.agentHost = agentHost;
}
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
* 指标组采集
*
* @author tomsun28 from <a href="https://github.com/dromara/hertzbeat">hertzbeat</a>
* @date 2021/10/10 15:35
* @date 2023/03/06 13:17
*/
@Slf4j
@Data
Expand Down
Original file line number Diff line number Diff line change
@@ -1,18 +1,85 @@
package com.zmops.open.collector.dispatch.entrance.zabbix.agent;

import com.zmops.open.collector.dispatch.entrance.internal.CollectJobService;
import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixProtocolType;
import com.zmops.open.collector.dispatch.entrance.zabbix.protocol.bean.ZabbixResponse;
import com.zmops.open.common.entity.job.Configmap;
import com.zmops.open.common.entity.job.Job;
import com.zmops.open.common.entity.job.Metrics;
import com.zmops.open.common.service.AppDefineHouse;
import com.zmops.open.common.support.SpringContextHolder;
import com.zmops.open.common.util.GsonUtil;
import com.zmops.open.common.util.SnowFlakeIdGenerator;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import lombok.extern.slf4j.Slf4j;

import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

/**
* @author nantian Zabbix protocol type
*/
@Slf4j
public class TcpClientHandler extends SimpleChannelInboundHandler<ZabbixResponse> {

private CollectJobService collectJobService;
private Set<Long> runningJobs;

public TcpClientHandler() {
this.collectJobService = SpringContextHolder.getBean(CollectJobService.class);
this.runningJobs = new HashSet<>(8);
}

@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, ZabbixResponse response) throws Exception {

if (response != null && response.getType() == ZabbixProtocolType.ACTIVE_CHECKS
&& response.getActiveChecks() != null) {
runningJobs.forEach(jobId -> {
collectJobService.cancelAsyncCollectJob(jobId);
});
runningJobs.clear();
for (ZabbixResponse.ActiveChecks metric : response.getActiveChecks()) {
try {
Map<String, String> paramsMap = metric.getParamsMap();
if (paramsMap.isEmpty()) {
continue;
}
// 构造采集任务Job实体
Job appDefine = AppDefineHouse.getAppDefine(paramsMap.get(ZabbixResponse.APP));
appDefine = GsonUtil.fromJson(GsonUtil.toJson(appDefine), Job.class);
long jobId = SnowFlakeIdGenerator.generateId();
appDefine.setId(jobId);
// set itemId 10000 - 99999 in monitorId
appDefine.setMonitorId(metric.getItemid());
appDefine.setInterval(30);
appDefine.setCyclic(true);
appDefine.setTimestamp(System.currentTimeMillis());
List<Configmap> configmaps = appDefine.getConfigmap().stream().peek(config -> {
String value = paramsMap.get(config.getKey());
config.setValue(value);
}).collect(Collectors.toList());
appDefine.setConfigmap(configmaps);
// filter metric
List<Metrics> metrics = appDefine.getMetrics().stream().filter(item ->
item.getName().equals(paramsMap.get(ZabbixResponse.METRICS)))
.peek(item -> item.setPriority((byte) 0))
.collect(Collectors.toList());
if (metrics.isEmpty()) {
continue;
}
appDefine.setMetrics(metrics);
// 下发采集任务
collectJobService.addAsyncCollectJob(appDefine);
runningJobs.add(jobId);
} catch (Exception e) {
log.error("add zabbix monitor job error {}", e.getMessage(), e);
}
}
}
}

@Override
Expand Down

This file was deleted.

Loading