Skip to content

Commit

Permalink
Merge pull request #360 from wangyu096/github_feature/agent_status_fa…
Browse files Browse the repository at this point in the history
…llback

feature: 强依赖Agent 状态的逻辑优化,提供降级处理机制 #226
  • Loading branch information
jsonwan authored Oct 25, 2021
2 parents a265edc + b19a1af commit 0be41df
Show file tree
Hide file tree
Showing 12 changed files with 262 additions and 103 deletions.
1 change: 1 addition & 0 deletions src/backend/commons/gse-sdk/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ dependencies {
implementation 'org.apache.commons:commons-lang3'
implementation 'org.apache.commons:commons-collections4'
implementation 'org.apache.httpcomponents:httpclient'
implementation 'io.micrometer:micrometer-registry-prometheus'
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.junit.jupiter:junit-jupiter'
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.common.gse.constants;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonValue;

/**
* Agent 状态
*/
public enum AgentStatusEnum {
/**
* 异常
*/
NOT_ALIVE(0),
/**
* 正常
*/
ALIVE(1),
/**
* 未知
*/
UNKNOWN(2);
@JsonValue
private final int status;

AgentStatusEnum(int status) {
this.status = status;
}

@JsonCreator
public static AgentStatusEnum valOf(int status) {
for (AgentStatusEnum agentStatus : values()) {
if (agentStatus.status == status) {
return agentStatus;
}
}
return null;
}

public int getValue() {
return status;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
/*
* Tencent is pleased to support the open source community by making BK-JOB蓝鲸智云作业平台 available.
*
* Copyright (C) 2021 THL A29 Limited, a Tencent company. All rights reserved.
*
* BK-JOB蓝鲸智云作业平台 is licensed under the MIT License.
*
* License for BK-JOB蓝鲸智云作业平台:
* --------------------------------------------------------------------
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated
* documentation files (the "Software"), to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and
* to permit persons to whom the Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or substantial portions of
* the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO
* THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF
* CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.common.gse.constants;

public interface GseConstants {
/**
* 直连云区域ID
*/
int DEFAULT_CLOUD_ID = 0;

/**
* GSE API 度量指标名称前缀
*/
String GSE_API_METRICS_NAME_PREFIX = "gse.api";
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,19 +27,24 @@
import com.fasterxml.jackson.annotation.JsonProperty;
import lombok.Data;

/**
* @since 11/11/2019 17:54
*/
@Data
public class AgentStatusDTO {
@JsonProperty("businessid")
private String businessId;

/**
* Agent是否存在,1: 存在,0: 不存在
*/
private Integer exist;

private String ip;

/**
* 云区域ID
*/
private String region;

/**
* Agent版本
*/
private String version;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,15 @@
import com.tencent.bk.gse.cacheapi.CacheIpInfo;
import com.tencent.bk.gse.cacheapi.CacheUser;
import com.tencent.bk.job.common.gse.config.GseConfig;
import com.tencent.bk.job.common.gse.constants.GseConstants;
import com.tencent.bk.job.common.gse.model.AgentStatusDTO;
import com.tencent.bk.job.common.gse.sdk.GseCacheClient;
import com.tencent.bk.job.common.gse.sdk.GseCacheClientFactory;
import com.tencent.bk.job.common.util.AgentUtils;
import com.tencent.bk.job.common.gse.util.AgentUtils;
import com.tencent.bk.job.common.util.ConcurrencyUtil;
import com.tencent.bk.job.common.util.Utils;
import com.tencent.bk.job.common.util.json.JsonUtils;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;

Expand All @@ -45,26 +47,19 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

@Slf4j
public class QueryAgentStatusClientImpl implements QueryAgentStatusClient {

private final GseCacheClientFactory gseCacheClientFactory;
private final GseConfig gseConfig;
private final MeterRegistry meterRegistry;

public QueryAgentStatusClientImpl(GseConfig gseConfig) {
public QueryAgentStatusClientImpl(GseConfig gseConfig, MeterRegistry meterRegistry) {
this.gseConfig = gseConfig;
gseCacheClientFactory = new GseCacheClientFactory(gseConfig);
}

/**
* 解析agent状态 预期解析的文本: {"businessid":"","exist":1,"ip":"1.1.1.1","region":"1","version":"NULL"}
*
* @return
*/
private static int parseCacheAgentStatus(String statusStr) {
AgentStatusDTO agentStatus = JsonUtils.fromJson(statusStr, AgentStatusDTO.class);
return agentStatus.getExist();
this.gseCacheClientFactory = new GseCacheClientFactory(gseConfig);
this.meterRegistry = meterRegistry;
}

@Override
Expand Down Expand Up @@ -98,31 +93,65 @@ public Map<String, AgentStatus> batchGetAgentStatus(List<String> ips) {

public Map<String, AgentStatus> batchGetAgentStatusWithoutLimit(Collection<String> ips) {
Map<String, AgentStatus> resultMap = new HashMap<>();
GseCacheClient gseClient = gseCacheClientFactory.getClient();
if (null == gseClient) {
log.error("get GSE cache connection failed");
AgentStatusResponse response = queryAgentStatusFromCacheApi(ips);
if (response == null) {
return resultMap;
}
try {
List<CacheIpInfo> ipInfoList = new ArrayList<>();
for (String ip : ips) {
String[] ipInfo = ip.split(":");
if (ipInfo.length != 2) {
log.warn("Request ip format error! IP: {}", ip);
continue;
}
Map<String, String> responseMap = response.getResult();
for (Map.Entry<String, String> item : responseMap.entrySet()) {
String[] ipInfo = item.getKey().split(":");
if (ipInfo.length != 2) {
log.error("Query Gse agent not return businessId:ip: {}", item.getKey());
continue;
}

CacheIpInfo cacheIpInfo = new CacheIpInfo();
String sourceStr = "1".equals(ipInfo[0]) ? "0" : ipInfo[0];
cacheIpInfo.setPlatId(sourceStr);
cacheIpInfo.setIp(ipInfo[1]);
ipInfoList.add(cacheIpInfo);
AgentStatus agentStatus = new AgentStatus();
agentStatus.ip = ipInfo[1];
agentStatus.status = parseCacheAgentStatus(item.getValue());
agentStatus.cloudAreaId = Utils.tryParseInt(ipInfo[0]);
resultMap.put(agentStatus.cloudAreaId + ":" + agentStatus.ip, agentStatus);
}
return resultMap;
}

/**
* 解析agent状态 预期解析的文本: {"businessid":"","exist":1,"ip":"1.1.1.1","region":"1","version":"NULL"}
*/
private static int parseCacheAgentStatus(String statusStr) {
AgentStatusDTO agentStatus = JsonUtils.fromJson(statusStr, AgentStatusDTO.class);
return agentStatus.getExist();
}


private AgentStatusResponse queryAgentStatusFromCacheApi(Collection<String> ips) {
GseCacheClient gseClient = gseCacheClientFactory.getClient();
if (null == gseClient) {
log.error("Get GSE cache client connection failed");
return null;
}

List<CacheIpInfo> ipInfoList = new ArrayList<>();
for (String ip : ips) {
String[] ipInfo = ip.split(":");
if (ipInfo.length != 2) {
log.warn("Request ip format error! IP: {}", ip);
continue;
}
if (ipInfoList.isEmpty()) {
return resultMap;
}

CacheIpInfo cacheIpInfo = new CacheIpInfo();
String cloudAreaIdStr = "1".equals(ipInfo[0]) ?
String.valueOf(GseConstants.DEFAULT_CLOUD_ID) : ipInfo[0];
cacheIpInfo.setPlatId(cloudAreaIdStr);
cacheIpInfo.setIp(ipInfo[1]);
ipInfoList.add(cacheIpInfo);
}
if (ipInfoList.isEmpty()) {
return null;
}

long start = System.currentTimeMillis();
String status = "ok";
try {
CacheUser user = new CacheUser();
user.setUser("bitmap");
user.setPassword("bitmap");
Expand All @@ -131,32 +160,21 @@ public Map<String, AgentStatus> batchGetAgentStatusWithoutLimit(Collection<Strin
request.setUser(user);
request.setIpinfos(ipInfoList);

log.debug("queryAgentStatus request: " + request);
Long startTime = System.currentTimeMillis();
log.debug("QueryAgentStatus request: {}", request);
AgentStatusResponse response = gseClient.getCacheClient().quireAgentStatus(request);
Long endTime = System.currentTimeMillis();
log.debug("queryAgentStatus response: " + response);
log.info("batchGetAgentStatus {} ips, time consuming:{}ms", ips.size(), (endTime - startTime));

Map<String, String> responseMap = response.getResult();
for (Map.Entry<String, String> item : responseMap.entrySet()) {
String[] ipInfo = item.getKey().split(":");
if (ipInfo.length != 2) {
log.error("query Gse agent not return businessId:ip: {}", item.getKey());
continue;
}

AgentStatus agentStatus = new AgentStatus();
agentStatus.ip = ipInfo[1];
agentStatus.status = parseCacheAgentStatus(item.getValue());
agentStatus.cloudAreaId = Utils.tryParseInt(ipInfo[0]);
resultMap.put(agentStatus.cloudAreaId + ":" + agentStatus.ip, agentStatus);
}
return resultMap;
} catch (Exception e) {
log.error("gse request failed", e);
return resultMap;
log.debug("QueryAgentStatus response: {}", response);
return response;
} catch (Throwable e) {
log.error("QueryAgentStatus error", e);
status = "error";
return null;
} finally {
long end = System.currentTimeMillis();
log.info("BatchGetAgentStatus {} ips, cost: {}ms", ips.size(), (end - start));
if (this.meterRegistry != null) {
meterRegistry.timer(GseConstants.GSE_API_METRICS_NAME_PREFIX, "api_name", "quireAgentStatus",
"status", status).record(end - start, TimeUnit.MICROSECONDS);
}
gseClient.tearDown();
}
}
Expand Down Expand Up @@ -192,7 +210,7 @@ private String getOneAliveIP(List<String> ipList) {
/**
* 传入的multiIp为逗号分隔的不带云区域ID的IP
*
* @param multiIp 多个IP
* @param multiIp 多IP,格式:ip1,ip2,ip3...ipN
* @param cloudAreaId 云区域ID
* @return 返回的单个IP不带云区域ID
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
* IN THE SOFTWARE.
*/

package com.tencent.bk.job.common.util;
package com.tencent.bk.job.common.gse.util;

import com.tencent.bk.job.common.gse.constants.AgentStatusEnum;

public final class AgentUtils {

Expand All @@ -33,9 +35,9 @@ private AgentUtils() {
* 返回Agent的状态是否正常
*
* @param status agent的状态
* @return 正常1
* @return 正常: true, 异常: false
*/
public static boolean isAgentOkByStatus(int status) {
return status == 1;
return status == AgentStatusEnum.ALIVE.getValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import com.tencent.bk.job.common.gse.service.QueryAgentStatusClient;
import com.tencent.bk.job.common.gse.service.QueryAgentStatusClientImpl;
import io.micrometer.core.instrument.MeterRegistry;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -34,7 +35,9 @@
public class GseClientAutoConfig {

@Bean
public QueryAgentStatusClient gseCacheClient(@Autowired GseConfig gseConfigForExecute) {
@Autowired
public QueryAgentStatusClient gseCacheClient(GseConfig gseConfigForExecute,
MeterRegistry meterRegistry) {
com.tencent.bk.job.common.gse.config.GseConfig gseConfig = new com.tencent.bk.job.common.gse.config.GseConfig();
gseConfig.setEnableSsl(gseConfigForExecute.isGseSSLEnable());
String[] cacheApiServerHosts = gseConfigForExecute.getGseCacheApiServerHost().split(",");
Expand All @@ -48,6 +51,6 @@ public QueryAgentStatusClient gseCacheClient(@Autowired GseConfig gseConfigForEx
gseConfig.setTrustManagerType(gseConfigForExecute.getGseSSLTruststoreManagerType());
gseConfig.setQueryBatchSize(gseConfigForExecute.getGseQueryBatchSize());
gseConfig.setQueryThreadsNum(gseConfigForExecute.getGseQueryThreadsNum());
return new QueryAgentStatusClientImpl(gseConfig);
return new QueryAgentStatusClientImpl(gseConfig, meterRegistry);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

package com.tencent.bk.job.execute.config;

import com.tencent.bk.job.common.gse.constants.GseConstants;
import com.tencent.bk.job.execute.monitor.ExecuteMetricNames;
import com.tencent.bk.job.execute.monitor.ExecuteMetricTags;
import io.micrometer.core.instrument.Meter;
Expand All @@ -47,7 +48,7 @@ public DistributionStatisticConfig configure(Meter.Id id, DistributionStatisticC
// [10ms,3s]
.minimumExpectedValue(10_000_000.0).maximumExpectedValue(3_000_000_000.0)
.build().merge(config);
} else if (metricName.startsWith(ExecuteMetricNames.GSE_API_PREFIX)) {
} else if (metricName.startsWith(GseConstants.GSE_API_METRICS_NAME_PREFIX)) {
return DistributionStatisticConfig.builder().percentilesHistogram(true)
// [10ms,1s]
.minimumExpectedValue(10_000_000.0).maximumExpectedValue(1_000_000_000.0)
Expand Down
Loading

0 comments on commit 0be41df

Please sign in to comment.