Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement][Serivce]部分组件角色退出运行后无法更新状态 #604

Open
wants to merge 3 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ public static void init() throws UnknownHostException, NoSuchAlgorithmException
new HostCheckCommand(),
actorSystem.dispatcher(),
ActorRef.noSender());


// 角色检测 15s 检测一次
actorSystem.scheduler().schedule(
FiniteDuration.apply(15L, TimeUnit.SECONDS),
FiniteDuration.apply(15L, TimeUnit.SECONDS),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.datasophon.api.strategy.ServiceRoleStrategyContext;
import com.datasophon.api.utils.SpringTool;
import com.datasophon.common.Constants;
import com.datasophon.common.enums.ServiceRoleType;
import com.datasophon.common.command.ServiceRoleCheckCommand;
import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity;

Expand All @@ -35,33 +36,27 @@
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;

import akka.actor.UntypedActor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ServiceRoleCheckActor extends UntypedActor {


private static final Logger logger = LoggerFactory.getLogger(ServiceRoleCheckActor.class);

@Override
public void onReceive(Object msg) throws Throwable {
if (msg instanceof ServiceRoleCheckCommand) {
logger.info("start to service role info");
ClusterServiceRoleInstanceService roleInstanceService =
SpringTool.getApplicationContext()
.getBean(ClusterServiceRoleInstanceService.class);

List<ClusterServiceRoleInstanceEntity> list =
roleInstanceService.list(
new QueryWrapper<ClusterServiceRoleInstanceEntity>()
.in(
Constants.SERVICE_ROLE_NAME,
"Prometheus",
"AlertManager",
"Krb5Kdc",
"KAdmin",
"SRFE",
"SRBE",
"DorisFE",
"DorisFEObserver",
"DorisBE",
"NameNode",
"ResourceManager",
"ElasticSearch"));
.ne(
Constants.ROLE_TYPE,
ServiceRoleType.CLIENT.getCode());

if (!list.isEmpty()) {
Map<String, ClusterServiceRoleInstanceEntity> map = translateListToMap(list);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.datasophon.api.strategy;

import com.datasophon.api.load.ServiceInfoMap;
import com.datasophon.api.load.ServiceRoleMap;
import com.datasophon.api.master.ActorUtils;
import com.datasophon.api.utils.ProcessUtils;
import com.datasophon.common.Constants;
import com.datasophon.common.command.ExecuteCmdCommand;
import com.datasophon.common.model.ServiceConfig;
import com.datasophon.common.model.ServiceInfo;
import com.datasophon.common.model.ServiceRoleInfo;
import com.datasophon.common.utils.ExecResult;
import com.datasophon.dao.entity.ClusterInfoEntity;
import com.datasophon.dao.entity.ClusterServiceRoleInstanceEntity;
import com.datasophon.dao.enums.AlertLevel;

import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import akka.actor.ActorSelection;
import akka.pattern.Patterns;
import akka.util.Timeout;

public class CommonHandlerStrategy implements ServiceRoleStrategy {

@Override
public void handler(Integer clusterId, List<String> hosts, String serviceName) {

}

@Override
public void handlerConfig(Integer clusterId, List<ServiceConfig> list, String serviceName) {

}

@Override
public void getConfig(Integer clusterId, List<ServiceConfig> list) {

}

@Override
public void handlerServiceRoleInfo(ServiceRoleInfo serviceRoleInfo, String hostname) {

}

@Override
public void handlerServiceRoleCheck(ClusterServiceRoleInstanceEntity roleInstanceEntity,
Map<String, ClusterServiceRoleInstanceEntity> map) {
Integer clusterId = roleInstanceEntity.getClusterId();

ClusterInfoEntity cluster = ProcessUtils.getClusterInfo(clusterId);
String frameCode = cluster.getClusterFrame();

String key = frameCode + Constants.UNDERLINE + roleInstanceEntity.getServiceName() + Constants.UNDERLINE
+ roleInstanceEntity.getServiceRoleName();
ServiceRoleInfo serviceRoleInfo = ServiceRoleMap.get(key);
ServiceInfo serviceInfo =
ServiceInfoMap.get(frameCode + Constants.UNDERLINE + roleInstanceEntity.getServiceName());

ActorSelection execCmdActor = ActorUtils.actorSystem.actorSelection(
"akka.tcp://datasophon@" + roleInstanceEntity.getHostname() + ":2552/user/worker/executeCmdActor");
ExecuteCmdCommand cmdCommand = new ExecuteCmdCommand();
ArrayList<String> commandList = new ArrayList<>();
commandList.add(serviceInfo.getDecompressPackageName() + Constants.SLASH
+ serviceRoleInfo.getStatusRunner().getProgram());
commandList.addAll(serviceRoleInfo.getStatusRunner().getArgs());
cmdCommand.setCommands(commandList);
Timeout timeout = new Timeout(Duration.create(30, TimeUnit.SECONDS));
Future<Object> execFuture = Patterns.ask(execCmdActor, cmdCommand, timeout);
try {
ExecResult execResult = (ExecResult) Await.result(execFuture, timeout.duration());
if (execResult.getExecResult()) {
ProcessUtils.recoverAlert(roleInstanceEntity);
} else {
String alertTargetName = roleInstanceEntity.getServiceRoleName() + " Survive";
ProcessUtils.saveAlert(roleInstanceEntity, alertTargetName, AlertLevel.EXCEPTION, "restart");
}
} catch (Exception e) {
// save alert
String alertTargetName = roleInstanceEntity.getServiceRoleName() + " Survive";
ProcessUtils.saveAlert(roleInstanceEntity, alertTargetName, AlertLevel.EXCEPTION, "restart");
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@
import java.util.concurrent.ConcurrentHashMap;

public class ServiceRoleStrategyContext {

private static final ServiceRoleStrategy commonStrategy=new CommonHandlerStrategy();

private static final String commonServiceName="CommonService";

private static final Map<String, ServiceRoleStrategy> strategyMap = new ConcurrentHashMap<>();

private static final Map<String, String> serviceNameMap = new ConcurrentHashMap<>();
Expand Down Expand Up @@ -103,13 +106,15 @@ public static ServiceRoleStrategy getServiceRoleHandler(String type) {
if (StringUtils.isBlank(type)) {
return null;
}
return strategyMap.get(type);
ServiceRoleStrategy strategy=strategyMap.get(type);
return (strategy == null) ? commonStrategy : strategy;
}

public static String getServiceName(String type) {
if (StringUtils.isBlank(type)) {
return null;
}
return serviceNameMap.get(type);
String name=serviceNameMap.get(type);
return (name == null) ? commonServiceName : name;
}
}