Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature-2925][server] Cherry pick from dev to init TaskLogger in TaskExecuteProcessor #4163

Merged
merged 2 commits into from
Dec 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ public abstract class AbstractShell {
/**
* If or not script finished executing
*/
private volatile AtomicBoolean completed;
private AtomicBoolean completed;

public AbstractShell() {
this(0L);
Expand Down Expand Up @@ -128,7 +128,7 @@ protected void run() throws IOException {
/**
* Run a command actual work
*/
private void runCommand() throws IOException {
private void runCommand() throws IOException {
ProcessBuilder builder = new ProcessBuilder(getExecString());
Timer timeOutTimer = null;
ShellTimeoutTimerTask timeoutTimerTask = null;
Expand All @@ -153,11 +153,11 @@ private void runCommand() throws IOException {
timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
}
final BufferedReader errReader =
new BufferedReader(new InputStreamReader(process
.getErrorStream()));
BufferedReader inReader =
new BufferedReader(new InputStreamReader(process
.getInputStream()));
new BufferedReader(
new InputStreamReader(process.getErrorStream()));
BufferedReader inReader =
new BufferedReader(
new InputStreamReader(process.getInputStream()));
final StringBuilder errMsg = new StringBuilder();

// read error and input streams as this would free up the buffers
Expand All @@ -177,23 +177,35 @@ public void run() {
}
}
};
Thread inThread = new Thread() {
@Override
public void run() {
try {
parseExecResult(inReader);
} catch (IOException ioe) {
logger.warn("Error reading the in stream", ioe);
}
super.run();
}
};
try {
errThread.start();
inThread.start();
} catch (IllegalStateException ise) { }
try {
// parse the output
parseExecResult(inReader);
exitCode = process.waitFor();
exitCode = process.waitFor();
try {
// make sure that the error thread exits
// make sure that the error and in thread exits
errThread.join();
inThread.join();
} catch (InterruptedException ie) {
logger.warn("Interrupted while reading the error stream", ie);
logger.warn("Interrupted while reading the error and in stream", ie);
}
completed.set(true);
completed.compareAndSet(false,true);
//the timeout thread handling
//taken care in finally block
if (exitCode != 0) {
if (exitCode != 0 || errMsg.length() > 0) {
throw new ExitCodeException(exitCode, errMsg.toString());
}
} catch (InterruptedException ie) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,18 +16,32 @@
*/
package org.apache.dolphinscheduler.common.utils;

import static org.apache.dolphinscheduler.common.Constants.DATA_BASEDIR_PATH;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXS;
import static org.apache.dolphinscheduler.common.Constants.RESOURCE_VIEW_SUFFIXS_DEFAULT_VALUE;
import static org.apache.dolphinscheduler.common.Constants.YYYYMMDDHHMMSS;

import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileWriter;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.StringReader;
import java.io.UnsupportedEncodingException;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;
import java.util.Optional;

import org.apache.commons.io.Charsets;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.*;
import java.nio.charset.Charset;
import java.nio.charset.UnsupportedCharsetException;

import static org.apache.dolphinscheduler.common.Constants.*;

/**
* file utils
*/
Expand All @@ -36,6 +50,8 @@ public class FileUtils {

public static final String DATA_BASEDIR = PropertyUtils.getString(DATA_BASEDIR_PATH,"/tmp/dolphinscheduler");

public static final ThreadLocal<Logger> taskLoggerThreadLocal = new ThreadLocal<>();

/**
* get file suffix
*
Expand Down Expand Up @@ -118,7 +134,7 @@ public static String getProcessExecDir(int projectId, int processDefineId, int p
String fileName = String.format("%s/exec/process/%s/%s/%s", DATA_BASEDIR, Integer.toString(projectId),
Integer.toString(processDefineId), Integer.toString(processInstanceId));
File file = new File(fileName);
if (!file.getParentFile().exists()){
if (!file.getParentFile().exists()) {
file.getParentFile().mkdirs();
}

Expand All @@ -138,24 +154,40 @@ public static String getResourceViewSuffixs() {
* @param userName user name
* @throws IOException errors
*/
public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException{
public static void createWorkDirAndUserIfAbsent(String execLocalPath, String userName) throws IOException {
//if work dir exists, first delete
File execLocalPathFile = new File(execLocalPath);

if (execLocalPathFile.exists()){
if (execLocalPathFile.exists()) {
org.apache.commons.io.FileUtils.forceDelete(execLocalPathFile);
}

//create work dir
org.apache.commons.io.FileUtils.forceMkdir(execLocalPathFile);
logger.info("create dir success {}" , execLocalPath);

String mkdirLog = "create dir success " + execLocalPath;
LoggerUtils.logInfo(Optional.ofNullable(logger), mkdirLog);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), mkdirLog);

//if not exists this user,then create
if (!OSUtils.getUserList().contains(userName)){
OSUtils.createUser(userName);
OSUtils.taskLoggerThreadLocal.set(taskLoggerThreadLocal.get());
try {
if (!OSUtils.getUserList().contains(userName)) {
boolean isSuccessCreateUser = OSUtils.createUser(userName);

String infoLog;
if (isSuccessCreateUser) {
infoLog = String.format("create user name success %s", userName);
} else {
infoLog = String.format("create user name fail %s", userName);
}
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog);
}
} catch (Throwable e) {
LoggerUtils.logError(Optional.ofNullable(logger), e);
LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e);
}
logger.info("create user name success {}", userName);
OSUtils.taskLoggerThreadLocal.remove();
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

Expand Down Expand Up @@ -93,4 +94,24 @@ public static List<String> getAppIds(String log, Logger logger) {
}
return appIds;
}

public static void logError(Optional<Logger> optionalLogger
, String error) {
optionalLogger.ifPresent((Logger logger) -> logger.error(error));
}

public static void logError(Optional<Logger> optionalLogger
, Throwable e) {
optionalLogger.ifPresent((Logger logger) -> logger.error(e.getMessage(), e));
}

public static void logError(Optional<Logger> optionalLogger
, String error, Throwable e) {
optionalLogger.ifPresent((Logger logger) -> logger.error(error, e));
}

public static void logInfo(Optional<Logger> optionalLogger
, String info) {
optionalLogger.ifPresent((Logger logger) -> logger.info(info));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@
*/
package org.apache.dolphinscheduler.common.utils;

import org.apache.commons.configuration.Configuration;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.shell.ShellExecutor;
import org.apache.commons.configuration.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import oshi.SystemInfo;
Expand All @@ -36,10 +36,7 @@
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.*;
import java.util.regex.Pattern;

/**
Expand All @@ -50,6 +47,8 @@ public class OSUtils {

private static final Logger logger = LoggerFactory.getLogger(OSUtils.class);

public static final ThreadLocal<Logger> taskLoggerThreadLocal = new ThreadLocal<>();

private static final SystemInfo SI = new SystemInfo();
public static final String TWO_DECIMAL = "0.00";

Expand Down Expand Up @@ -236,7 +235,9 @@ public static boolean createUser(String userName) {
try {
String userGroup = OSUtils.getGroup();
if (StringUtils.isEmpty(userGroup)) {
logger.error("{} group does not exist for this operating system.", userGroup);
String errorLog = String.format("%s group does not exist for this operating system.", userGroup);
LoggerUtils.logError(Optional.ofNullable(logger), errorLog);
LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), errorLog);
return false;
}
if (isMacOS()) {
Expand All @@ -248,7 +249,8 @@ public static boolean createUser(String userName) {
}
return true;
} catch (Exception e) {
logger.error(e.getMessage(), e);
LoggerUtils.logError(Optional.ofNullable(logger), e);
LoggerUtils.logError(Optional.ofNullable(taskLoggerThreadLocal.get()), e);
}

return false;
Expand All @@ -261,10 +263,14 @@ public static boolean createUser(String userName) {
* @throws IOException in case of an I/O error
*/
private static void createLinuxUser(String userName, String userGroup) throws IOException {
logger.info("create linux os user : {}", userName);
String cmd = String.format("sudo useradd -g %s %s", userGroup, userName);
String infoLog1 = String.format("create linux os user : %s", userName);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog1);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog1);

logger.info("execute cmd : {}", cmd);
String cmd = String.format("sudo useradd -g %s %s", userGroup, userName);
String infoLog2 = String.format("execute cmd : %s", cmd);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2);
OSUtils.exeCmd(cmd);
}

Expand All @@ -275,13 +281,24 @@ private static void createLinuxUser(String userName, String userGroup) throws IO
* @throws IOException in case of an I/O error
*/
private static void createMacUser(String userName, String userGroup) throws IOException {
logger.info("create mac os user : {}", userName);
String userCreateCmd = String.format("sudo sysadminctl -addUser %s -password %s", userName, userName);
String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup);

logger.info("create user command : {}", userCreateCmd);
OSUtils.exeCmd(userCreateCmd);
logger.info("append user to group : {}", appendGroupCmd);
Optional<Logger> optionalLogger = Optional.ofNullable(logger);
Optional<Logger> optionalTaskLogger = Optional.ofNullable(taskLoggerThreadLocal.get());

String infoLog1 = String.format("create mac os user : %s", userName);
LoggerUtils.logInfo(optionalLogger, infoLog1);
LoggerUtils.logInfo(optionalTaskLogger, infoLog1);

String createUserCmd = String.format("sudo sysadminctl -addUser %s -password %s", userName, userName);
String infoLog2 = String.format("create user command : %s", createUserCmd);
LoggerUtils.logInfo(optionalLogger, infoLog2);
LoggerUtils.logInfo(optionalTaskLogger, infoLog2);
OSUtils.exeCmd(createUserCmd);

String appendGroupCmd = String.format("sudo dseditgroup -o edit -a %s -t user %s", userName, userGroup);
String infoLog3 = String.format("append user to group : %s", appendGroupCmd);
LoggerUtils.logInfo(optionalLogger, infoLog3);
LoggerUtils.logInfo(optionalTaskLogger, infoLog3);
OSUtils.exeCmd(appendGroupCmd);
}

Expand All @@ -292,14 +309,20 @@ private static void createMacUser(String userName, String userGroup) throws IOEx
* @throws IOException in case of an I/O error
*/
private static void createWindowsUser(String userName, String userGroup) throws IOException {
logger.info("create windows os user : {}", userName);
String userCreateCmd = String.format("net user \"%s\" /add", userName);
String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName);
String infoLog1 = String.format("create windows os user : %s", userName);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog1);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog1);

logger.info("execute create user command : {}", userCreateCmd);
String userCreateCmd = String.format("net user \"%s\" /add", userName);
String infoLog2 = String.format("execute create user command : %s", userCreateCmd);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog2);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog2);
OSUtils.exeCmd(userCreateCmd);

logger.info("execute append user to group : {}", appendGroupCmd);
String appendGroupCmd = String.format("net localgroup \"%s\" \"%s\" /add", userGroup, userName);
String infoLog3 = String.format("execute append user to group : %s", appendGroupCmd);
LoggerUtils.logInfo(Optional.ofNullable(logger), infoLog3);
LoggerUtils.logInfo(Optional.ofNullable(taskLoggerThreadLocal.get()), infoLog3);
OSUtils.exeCmd(appendGroupCmd);
}

Expand Down Expand Up @@ -338,22 +361,12 @@ public static String getGroup() throws IOException {
* @throws IOException errors
*/
public static String exeCmd(String command) throws IOException {
BufferedReader br = null;

try {
Process p = Runtime.getRuntime().exec(command);
br = new BufferedReader(new InputStreamReader(p.getInputStream()));
String line;
StringBuilder sb = new StringBuilder();

while ((line = br.readLine()) != null) {
sb.append(line + "\n");
}

return sb.toString();
} finally {
IOUtils.closeQuietly(br);
StringTokenizer st = new StringTokenizer(command);
String[] cmdArray = new String[st.countTokens()];
for (int i = 0; st.hasMoreTokens(); i++) {
cmdArray[i] = st.nextToken();
}
return exeShell(cmdArray);
}

/**
Expand All @@ -362,7 +375,7 @@ public static String exeCmd(String command) throws IOException {
* @return result of execute the shell
* @throws IOException errors
*/
public static String exeShell(String command) throws IOException {
public static String exeShell(String[] command) throws IOException {
return ShellExecutor.execCommand(command);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ public void getGroup() {
@Test
public void createUser() {
boolean result = OSUtils.createUser("test123");
Assert.assertTrue(result);
if (result) {
Assert.assertTrue("create user test123 success", true);
} else {
Assert.assertTrue("create user test123 fail", true);
}
}

@Test
Expand Down
Loading