Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Improvement-3668][Server] ProcedureTask,SqlTask code structure standardization and checkstyle #3669

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,31 +14,40 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.dolphinscheduler.server.worker.task.processdure;

import com.cronutils.utils.StringUtils;
import org.apache.dolphinscheduler.common.Constants;
import org.apache.dolphinscheduler.common.enums.*;
import org.apache.dolphinscheduler.common.enums.CommandType;
import org.apache.dolphinscheduler.common.enums.DataType;
import org.apache.dolphinscheduler.common.enums.DbType;
import org.apache.dolphinscheduler.common.enums.Direct;
import org.apache.dolphinscheduler.common.enums.TaskTimeoutStrategy;
import org.apache.dolphinscheduler.common.process.Property;
import org.apache.dolphinscheduler.common.task.AbstractParameters;
import org.apache.dolphinscheduler.common.task.procedure.ProcedureParameters;
import org.apache.dolphinscheduler.common.utils.CollectionUtils;
import org.apache.dolphinscheduler.common.utils.*;
import org.apache.dolphinscheduler.common.utils.JSONUtils;
import org.apache.dolphinscheduler.common.utils.ParameterUtils;
import org.apache.dolphinscheduler.common.utils.StringUtils;
import org.apache.dolphinscheduler.dao.datasource.BaseDataSource;
import org.apache.dolphinscheduler.dao.datasource.DataSourceFactory;
import org.apache.dolphinscheduler.server.entity.TaskExecutionContext;
import org.apache.dolphinscheduler.server.utils.ParamUtils;
import org.apache.dolphinscheduler.server.worker.task.AbstractTask;
import org.slf4j.Logger;

import java.sql.*;
import java.sql.CallableStatement;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Types;
import java.util.Collection;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;

import static org.apache.dolphinscheduler.common.enums.DataType.*;
import org.slf4j.Logger;

/**
* procedure task
Expand All @@ -54,8 +63,7 @@ public class ProcedureTask extends AbstractTask {
* base datasource
*/
private BaseDataSource baseDataSource;



/**
* taskExecutionContext
*/
Expand All @@ -68,14 +76,14 @@ public class ProcedureTask extends AbstractTask {
*/
public ProcedureTask(TaskExecutionContext taskExecutionContext, Logger logger) {
super(taskExecutionContext, logger);

this.taskExecutionContext = taskExecutionContext;

logger.info("procedure task params {}", taskExecutionContext.getTaskParams());
}

@Override
public void init() {
logger.info("procedure task params {}", taskExecutionContext.getTaskParams());
this.procedureParameters = JSONUtils.parseObject(taskExecutionContext.getTaskParams(), ProcedureParameters.class);


// check parameters
if (!procedureParameters.checkParameters()) {
throw new RuntimeException("procedure task params is not valid");
Expand Down Expand Up @@ -104,25 +112,21 @@ public void handle() throws Exception {
baseDataSource = DataSourceFactory.getDatasource(DbType.valueOf(procedureParameters.getType()),
taskExecutionContext.getProcedureTaskExecutionContext().getConnectionParams());


// get jdbc connection
connection = DriverManager.getConnection(baseDataSource.getJdbcUrl(),
baseDataSource.getUser(),
baseDataSource.getPassword());



// combining local and global parameters
Map<String, Property> paramsMap = ParamUtils.convert(ParamUtils.getUserDefParamsMap(taskExecutionContext.getDefinedParams()),
taskExecutionContext.getDefinedParams(),
procedureParameters.getLocalParametersMap(),
CommandType.of(taskExecutionContext.getCmdTypeIfComplement()),
taskExecutionContext.getScheduleTime());


Collection<Property> userDefParamsList = null;

if (procedureParameters.getLocalParametersMap() != null){
if (procedureParameters.getLocalParametersMap() != null) {
userDefParamsList = procedureParameters.getLocalParametersMap().values();
}

Expand All @@ -139,7 +143,6 @@ public void handle() throws Exception {
// outParameterMap
Map<Integer, Property> outParameterMap = getOutParameterMap(stmt, paramsMap, userDefParamsList);


stmt.executeUpdate();

/**
Expand All @@ -148,7 +151,7 @@ public void handle() throws Exception {
printOutParameter(stmt, outParameterMap);

setExitStatusCode(Constants.EXIT_CODE_SUCCESS);
}catch (Exception e){
} catch (Exception e) {
setExitStatusCode(Constants.EXIT_CODE_FAILURE);
logger.error("procedure task error",e);
throw e;
Expand All @@ -165,17 +168,17 @@ public void handle() throws Exception {
*/
private String getCallMethod(Collection<Property> userDefParamsList) {
String method;// no parameters
if (CollectionUtils.isEmpty(userDefParamsList)){
if (CollectionUtils.isEmpty(userDefParamsList)) {
method = "{call " + procedureParameters.getMethod() + "}";
}else { // exists parameters
} else { // exists parameters
int size = userDefParamsList.size();
StringBuilder parameter = new StringBuilder();
parameter.append("(");
for (int i = 0 ;i < size - 1; i++){
for (int i = 0;i < size - 1; i++) {
parameter.append("?,");
}
parameter.append("?)");
method = "{call " + procedureParameters.getMethod() + parameter.toString()+ "}";
method = "{call " + procedureParameters.getMethod() + parameter.toString() + "}";
}
return method;
}
Expand All @@ -184,12 +187,12 @@ private String getCallMethod(Collection<Property> userDefParamsList) {
* print outParameter
* @param stmt CallableStatement
* @param outParameterMap outParameterMap
* @throws SQLException
* @throws SQLException SQLException
*/
private void printOutParameter(CallableStatement stmt,
Map<Integer, Property> outParameterMap) throws SQLException {
Iterator<Map.Entry<Integer, Property>> iter = outParameterMap.entrySet().iterator();
while (iter.hasNext()){
while (iter.hasNext()) {
Map.Entry<Integer, Property> en = iter.next();

int index = en.getKey();
Expand All @@ -208,24 +211,24 @@ private void printOutParameter(CallableStatement stmt,
* @param paramsMap paramsMap
* @param userDefParamsList userDefParamsList
* @return outParameterMap
* @throws Exception
* @throws Exception Exception
*/
private Map<Integer, Property> getOutParameterMap(CallableStatement stmt,
Map<String, Property> paramsMap,
Collection<Property> userDefParamsList) throws Exception {
Map<Integer,Property> outParameterMap = new HashMap<>();
if (userDefParamsList != null && userDefParamsList.size() > 0){
if (userDefParamsList != null && userDefParamsList.size() > 0) {
int index = 1;
for (Property property : userDefParamsList){
for (Property property : userDefParamsList) {
logger.info("localParams : prop : {} , dirct : {} , type : {} , value : {}"
,property.getProp(),
property.getDirect(),
property.getType(),
property.getValue());
// set parameters
if (property.getDirect().equals(Direct.IN)){
if (property.getDirect().equals(Direct.IN)) {
ParameterUtils.setInParameter(index, stmt, property.getType(), paramsMap.get(property.getProp()).getValue());
}else if (property.getDirect().equals(Direct.OUT)){
} else if (property.getDirect().equals(Direct.OUT)) {
setOutParameter(index,stmt,property.getType(),paramsMap.get(property.getProp()).getValue());
property.setValue(paramsMap.get(property.getProp()).getValue());
outParameterMap.put(index,property);
Expand All @@ -239,50 +242,49 @@ private Map<Integer, Property> getOutParameterMap(CallableStatement stmt,
/**
* set timtou
* @param stmt CallableStatement
* @throws SQLException
* @throws SQLException SQLException
*/
private void setTimeout(CallableStatement stmt) throws SQLException {
Boolean failed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.FAILED;
Boolean warnfailed = TaskTimeoutStrategy.of(taskExecutionContext.getTaskTimeoutStrategy()) == TaskTimeoutStrategy.WARNFAILED;
if(failed || warnfailed){
if (failed || warnfailed) {
stmt.setQueryTimeout(taskExecutionContext.getTaskTimeout());
}
}

/**
* close jdbc resource
*
* @param stmt
* @param connection
* @param stmt PreparedStatement
* @param connection Connection
*/
private void close(PreparedStatement stmt,
Connection connection){
private void close(PreparedStatement stmt, Connection connection) {
if (stmt != null) {
try {
stmt.close();
} catch (SQLException e) {

logger.error("close prepared statement error : {}",e.getMessage(),e);
}
}
if (connection != null) {
try {
connection.close();
} catch (SQLException e) {

logger.error("close connection error : {}",e.getMessage(),e);
}
}
}

/**
* get output parameter
* @param stmt
* @param stmt CallableStatement
* @param index
* @param prop
* @param dataType
* @throws SQLException
* @throws SQLException SQLException
*/
private void getOutputParameter(CallableStatement stmt, int index, String prop, DataType dataType) throws SQLException {
switch (dataType){
switch (dataType) {
case VARCHAR:
logger.info("out prameter varchar key : {} , value : {}",prop,stmt.getString(index));
break;
Expand Down Expand Up @@ -328,66 +330,66 @@ public AbstractParameters getParameters() {
* @param value value
* @throws Exception exception
*/
private void setOutParameter(int index,CallableStatement stmt,DataType dataType,String value)throws Exception{
if (dataType.equals(VARCHAR)){
if (StringUtils.isEmpty(value)){
private void setOutParameter(int index,CallableStatement stmt,DataType dataType,String value) throws Exception {
if (dataType.equals(DataType.VARCHAR)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.VARCHAR);
}else {
} else {
stmt.registerOutParameter(index, Types.VARCHAR, value);
}

}else if (dataType.equals(INTEGER)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(DataType.INTEGER)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.INTEGER);
}else {
} else {
stmt.registerOutParameter(index, Types.INTEGER, value);
}

}else if (dataType.equals(LONG)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(DataType.LONG)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index,Types.INTEGER);
}else {
stmt.registerOutParameter(index,Types.INTEGER ,value);
} else {
stmt.registerOutParameter(index,Types.INTEGER,value);
}
}else if (dataType.equals(FLOAT)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(DataType.FLOAT)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.FLOAT);
}else {
} else {
stmt.registerOutParameter(index, Types.FLOAT,value);
}
}else if (dataType.equals(DOUBLE)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(DataType.DOUBLE)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.DOUBLE);
}else {
stmt.registerOutParameter(index, Types.DOUBLE , value);
} else {
stmt.registerOutParameter(index, Types.DOUBLE, value);
}

}else if (dataType.equals(DATE)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(DataType.DATE)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.DATE);
}else {
stmt.registerOutParameter(index, Types.DATE , value);
} else {
stmt.registerOutParameter(index, Types.DATE, value);
}

}else if (dataType.equals(TIME)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(DataType.TIME)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.TIME);
}else {
stmt.registerOutParameter(index, Types.TIME , value);
} else {
stmt.registerOutParameter(index, Types.TIME, value);
}

}else if (dataType.equals(TIMESTAMP)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(DataType.TIMESTAMP)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.TIMESTAMP);
}else {
stmt.registerOutParameter(index, Types.TIMESTAMP , value);
} else {
stmt.registerOutParameter(index, Types.TIMESTAMP, value);
}

}else if (dataType.equals(BOOLEAN)){
if (StringUtils.isEmpty(value)){
} else if (dataType.equals(DataType.BOOLEAN)) {
if (StringUtils.isEmpty(value)) {
stmt.registerOutParameter(index, Types.BOOLEAN);
}else {
stmt.registerOutParameter(index, Types.BOOLEAN , value);
} else {
stmt.registerOutParameter(index, Types.BOOLEAN, value);
}
}
}
Expand Down
Loading