Skip to content

Commit

Permalink
refactor: improve and refine the usage scope of interfaces and classes (
Browse files Browse the repository at this point in the history
#429)

Co-authored-by: gaoxh <32359336+gaoxh@users.noreply.github.com>
  • Loading branch information
psxjoy and gaoxh authored Sep 10, 2024
1 parent 4f4b57d commit c0fe67e
Show file tree
Hide file tree
Showing 11 changed files with 453 additions and 271 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,14 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.xiaomi.mone.log.server.porcessor;

import com.xiaomi.data.push.rpc.netty.NettyRequestProcessor;
import com.xiaomi.data.push.rpc.protocol.RemotingCommand;
import com.xiaomi.mone.log.api.model.meta.LogCollectMeta;
import com.xiaomi.mone.log.api.service.AgentConfigService;
import com.xiaomi.mone.log.common.Constant;
import com.xiaomi.mone.log.server.service.AgentConfigAcquirer;
import com.xiaomi.mone.log.server.service.DefaultAgentConfigAcquirer;
import com.xiaomi.youpin.docean.Ioc;
import io.netty.channel.ChannelHandlerContext;
Expand All @@ -35,22 +36,22 @@
*/
@Slf4j
public class AgentConfigProcessor implements NettyRequestProcessor {

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws Exception {
RemotingCommand response = RemotingCommand.createResponseCommand(Constant.RPCCMD_AGENT_CONFIG_CODE);
String ip = new String(request.getBody());
log.info("agent start get metadata config,agent ip:{}", ip);

AgentConfigAcquirer agentConfigService = Ioc.ins().getBean(DefaultAgentConfigAcquirer.class);

AgentConfigService agentConfigService = Ioc.ins().getBean(DefaultAgentConfigAcquirer.class);
LogCollectMeta logCollectMeta = agentConfigService.getLogCollectMetaFromManager(ip);
String responseInfo = GSON.toJson(logCollectMeta);
log.info("agent start get metadata config info:{}", responseInfo);
response.setBody(responseInfo.getBytes());
return response;
}

@Override
public boolean rejectRequest() {
return false;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.xiaomi.mone.log.server.service;

import com.xiaomi.mone.log.api.model.meta.LogCollectMeta;
Expand All @@ -29,11 +30,11 @@
*/
@Component
@Slf4j
public class DefaultAgentConfigAcquirer implements AgentConfigAcquirer {

public class DefaultAgentConfigAcquirer implements AgentConfigService {
@Reference(interfaceClass = AgentConfigService.class, group = "$dubbo.group", check = false, timeout = 10000)
private AgentConfigService agentConfigService;

@Override
public LogCollectMeta getLogCollectMetaFromManager(String ip) {
LogCollectMeta logCollectMeta = new LogCollectMeta();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,117 +13,28 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.xiaomi.mone.log.agent.service;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.google.common.collect.Maps;
import com.xiaomi.data.push.common.SafeRun;
import com.xiaomi.data.push.nacos.NacosNaming;
import com.xiaomi.data.push.rpc.RpcClient;
import com.xiaomi.mone.log.common.Config;
import com.xiaomi.mone.log.utils.NetUtil;
import com.xiaomi.youpin.docean.anno.Service;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import javax.annotation.Resource;
import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Objects;

import static com.xiaomi.mone.log.common.Constant.STREAM_CONTAINER_POD_NAME_KEY;
import static com.xiaomi.mone.log.utils.ConfigUtils.getDataHashKey;

/**
* @author wtt
* @version 1.0
* @description
* @date 2023/12/19 14:15
*/
@Service
@Slf4j
public class ServiceRegistryService {

@Resource
private RpcClient rpcClient;
private final static String SERVER_PREFIX = "prometheus_server";
private final static String APP_NAME_LABEL = "app_name";
private final static String APP_ID_LABEL = "app_id";
private final static String ENV_ID_LABEL = "env_id";
private final static String ENV_NAME_LABEL = "env_name";
private static final String DEFAULT_TIME_DATE_FORMAT = "yyyy-MM-dd hh:mm:ss";
public static final String STREAM_VERSION = "hera-log-agent:1.0.0:2023-12-20";

private String appName;
private String appId;
private String envId;
private String envName;
private Integer port;
private String ip;

public void init() {
this.initializeEnvironmentParameters();
String registrationInitiationFlag = Config.ins().get("registration_initiation_flag", "false");
if (Objects.equals("true", registrationInitiationFlag)) {
this.registerServiceInstance();
}
}

private void registerServiceInstance() {
NacosNaming nacosNaming = rpcClient.getNacosNaming();
int appIndex = getDataHashKey(ip, Integer.parseInt(Config.ins().get("app_max_index", "30")));
String serviceName = String.format("%s_%s_%s_%s", SERVER_PREFIX, appId, appName, appIndex);

try {
nacosNaming.registerInstance(serviceName, buildInstance(serviceName));
addShutdownHook(nacosNaming, serviceName);
} catch (NacosException e) {
log.error("registerService error,serviceName:{}", serviceName, e);
}
}

private void addShutdownHook(NacosNaming nacosNaming, String serviceName) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
log.info("agent unregisters the instance and the service name:{}", serviceName);
nacosNaming.deregisterInstance(serviceName, ip, port);
} catch (NacosException e) {
log.error("agent unregisters the instance error,service name:{}", serviceName, e);
}
}));
}

private Instance buildInstance(String serviceName) {
Instance instance = new Instance();
instance.setEnabled(true);
instance.setHealthy(true);
instance.setIp(ip);
instance.setPort(port);
instance.setServiceName(serviceName);

Map<String, String> metaData = Maps.newHashMap();
metaData.put("ctime", new SimpleDateFormat(DEFAULT_TIME_DATE_FORMAT).format(new Date()));
metaData.put("version", STREAM_VERSION);
metaData.put(STREAM_CONTAINER_POD_NAME_KEY, System.getenv(STREAM_CONTAINER_POD_NAME_KEY));
metaData.put(ENV_ID_LABEL, envId);
metaData.put(ENV_NAME_LABEL, envName);

SafeRun.run(() -> metaData.put("hostname", InetAddress.getLocalHost().getHostName()));
instance.setMetadata(metaData);

return instance;
}


public void initializeEnvironmentParameters() {
appName = StringUtils.isNotBlank(System.getenv(APP_NAME_LABEL)) ? System.getenv(APP_NAME_LABEL) : "log_agent";
appId = StringUtils.isNotBlank(System.getenv(APP_ID_LABEL)) ? System.getenv(APP_ID_LABEL) : "10010";
envName = StringUtils.isNotBlank(System.getenv(ENV_NAME_LABEL)) ? System.getenv(ENV_NAME_LABEL) : "default_env";
envId = StringUtils.isNotBlank(System.getenv(ENV_ID_LABEL)) ? System.getenv(ENV_ID_LABEL) : "1";
port = Integer.parseInt(Config.ins().get("port", "9799"));
ip = NetUtil.getLocalIp();
}
public interface ServiceRegistryService {

void init();

void registerServiceInstance();

void addShutdownHook(NacosNaming nacosNaming, String serviceName);

Instance buildInstance(String serviceName);

void initializeEnvironmentParameters();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright (C) 2020 Xiaomi Corporation
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.xiaomi.mone.log.agent.service.impl;

import com.alibaba.nacos.api.exception.NacosException;
import com.alibaba.nacos.api.naming.pojo.Instance;
import com.google.common.collect.Maps;
import com.xiaomi.data.push.common.SafeRun;
import com.xiaomi.data.push.nacos.NacosNaming;
import com.xiaomi.data.push.rpc.RpcClient;
import com.xiaomi.mone.log.agent.service.ServiceRegistryService;
import com.xiaomi.mone.log.common.Config;
import com.xiaomi.mone.log.utils.NetUtil;
import com.xiaomi.youpin.docean.anno.Service;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;

import javax.annotation.Resource;
import java.net.InetAddress;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
import java.util.Objects;

import static com.xiaomi.mone.log.common.Constant.STREAM_CONTAINER_POD_NAME_KEY;
import static com.xiaomi.mone.log.utils.ConfigUtils.getDataHashKey;

/**
* @author wtt
* @version 1.0
* @description
* @date 2023/12/19 14:15
*/
@Service
@Slf4j
public class ServiceRegistryServiceImpl implements ServiceRegistryService {

@Resource
private RpcClient rpcClient;

private final static String SERVER_PREFIX = "prometheus_server";

private final static String APP_NAME_LABEL = "app_name";

private final static String APP_ID_LABEL = "app_id";

private final static String ENV_ID_LABEL = "env_id";

private final static String ENV_NAME_LABEL = "env_name";

private static final String DEFAULT_TIME_DATE_FORMAT = "yyyy-MM-dd hh:mm:ss";

public static final String STREAM_VERSION = "hera-log-agent:1.0.0:2023-12-20";

private String appName;

private String appId;

private String envId;

private String envName;

private Integer port;

private String ip;

@Override
public void init() {
this.initializeEnvironmentParameters();
String registrationInitiationFlag = Config.ins().get("registration_initiation_flag", "false");
if (Objects.equals("true", registrationInitiationFlag)) {
this.registerServiceInstance();
}
}

@Override
public void registerServiceInstance() {
NacosNaming nacosNaming = rpcClient.getNacosNaming();
int appIndex = getDataHashKey(ip, Integer.parseInt(Config.ins().get("app_max_index", "30")));
String serviceName = String.format("%s_%s_%s_%s", SERVER_PREFIX, appId, appName, appIndex);

try {
nacosNaming.registerInstance(serviceName, buildInstance(serviceName));
addShutdownHook(nacosNaming, serviceName);
} catch (NacosException e) {
log.error("registerService error,serviceName:{}", serviceName, e);
}
}

@Override
public void addShutdownHook(NacosNaming nacosNaming, String serviceName) {
Runtime.getRuntime().addShutdownHook(new Thread(() -> {
try {
log.info("agent unregisters the instance and the service name:{}", serviceName);
nacosNaming.deregisterInstance(serviceName, ip, port);
} catch (NacosException e) {
log.error("agent unregisters the instance error,service name:{}", serviceName, e);
}
}));
}

@Override
public Instance buildInstance(String serviceName) {
Instance instance = new Instance();
instance.setEnabled(true);
instance.setHealthy(true);
instance.setIp(ip);
instance.setPort(port);
instance.setServiceName(serviceName);

Map<String, String> metaData = Maps.newHashMap();
metaData.put("ctime", new SimpleDateFormat(DEFAULT_TIME_DATE_FORMAT).format(new Date()));
metaData.put("version", STREAM_VERSION);
metaData.put(STREAM_CONTAINER_POD_NAME_KEY, System.getenv(STREAM_CONTAINER_POD_NAME_KEY));
metaData.put(ENV_ID_LABEL, envId);
metaData.put(ENV_NAME_LABEL, envName);

SafeRun.run(() -> metaData.put("hostname", InetAddress.getLocalHost().getHostName()));
instance.setMetadata(metaData);

return instance;
}

@Override
public void initializeEnvironmentParameters() {
appName = StringUtils.isNotBlank(System.getenv(APP_NAME_LABEL)) ? System.getenv(APP_NAME_LABEL) : "log_agent";
appId = StringUtils.isNotBlank(System.getenv(APP_ID_LABEL)) ? System.getenv(APP_ID_LABEL) : "10010";
envName = StringUtils.isNotBlank(System.getenv(ENV_NAME_LABEL)) ? System.getenv(ENV_NAME_LABEL) : "default_env";
envId = StringUtils.isNotBlank(System.getenv(ENV_ID_LABEL)) ? System.getenv(ENV_ID_LABEL) : "1";
port = Integer.parseInt(Config.ins().get("port", "9799"));
ip = NetUtil.getLocalIp();
}
}
Loading

0 comments on commit c0fe67e

Please sign in to comment.