Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[1.3_beta][taier-all] fix sql #818

Merged
merged 1 commit into from
Oct 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 19 additions & 29 deletions sql/1.3/1.3_increment.sql

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ public interface CommonConstant {
String TASK_NAME_PREFIX = "run_%s_task_%s";

String DATASOURCE_PREFIX = "taier.datasource.";
String DATASOURCE_ID = "datasourceId";
String DATASOURCE_TYPE = "datasourceType";

String RDB_SUBMIT_QUEUE_SIZE = "jobQueueSize";

String RDB_SUBMIT_CONSUMER_MIN_NUM = "minJobPoolSize";

String RDB_SUBMIT_CONSUMER_MAX_NUM = "maxJobPoolSize";
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,6 @@ public interface PatternConstant {

String FUNCTION_PATTERN = "[a-z0-9_]+";

/** 字符串查找是否存在密码字段的正则表达式 **/
String PASSWORD_FIELD_REGEX = "\"(pass(word)?|accesskey)\"\\s*:\\s*\"\\*{6}\"";

/**
* 正则: 租户名称正则表达式,字母、数字、下划线组成,且长度不超过64个字符
* Regular: Tenant name regular expression, consisting of letters, numbers, and underscores, and the length does not exceed 64 characters
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import io.swagger.annotations.Api;
import io.swagger.annotations.ApiOperation;
import io.swagger.annotations.ApiParam;
import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.springframework.beans.factory.annotation.Autowired;
Expand All @@ -44,18 +45,18 @@

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;

/**
* 新增数据源相关功能控制器类
*
* @description:
* @author: liuxx
* @date: 2021/3/8
*/
@Api(tags = {"数据源中心-新增数据源"})
@RestController
@RequestMapping(value ="/dataSource/addDs")
@RequestMapping(value = "/dataSource/addDs")
public class DatasourceAddController {

private final String RESOURCE = "resource";
Expand Down Expand Up @@ -95,7 +96,6 @@ public R<List<DsVersionVO>> queryDsVersionByType(@RequestBody DsVersionSearchPar
}



@ApiOperation("测试联通性")
@PostMapping("/testCon")
public R<Boolean> testCon(@RequestBody AddDataSourceParam addDataSourceParam) {
Expand All @@ -105,6 +105,7 @@ public R<Boolean> testCon(@RequestBody AddDataSourceParam addDataSourceParam) {
protected void checkParams() throws IllegalArgumentException {
Asserts.hasText(addDataSourceParam.getDataType(), "数据源类型不能为空!");
}

@Override
protected Boolean process() throws RdosDefineException {
DataSourceVO dataSourceVO = new DataSourceParam2SourceVOConverter().convert(addDataSourceParam);
Expand All @@ -127,6 +128,7 @@ protected void checkParams() throws IllegalArgumentException {
String principal = (String) dataSourceJson.get("principal");
Asserts.hasText(principal, "kerberos principle不能为空!");
}

@Override
protected Boolean process() throws RdosDefineException {
Pair<String, String> resource = (Pair<String, String>) params.get("resource");
Expand All @@ -139,18 +141,18 @@ protected Boolean process() throws RdosDefineException {
}



@ApiOperation("添加和修改数据源")
@PostMapping("/addOrUpdateSource")
public R<Long> addOrUpdateSource(@RequestBody AddDataSourceParam addDataSourceParam) {
return new APITemplate<Long>() {
@Override
protected void checkParams() throws IllegalArgumentException {
if (addDataSourceParam == null ||
StringUtils.isBlank(addDataSourceParam.getDataName())){
StringUtils.isBlank(addDataSourceParam.getDataName())) {
throw new PubSvcDefineException("dataSource name empty");
}
}

@Override
protected Long process() throws RdosDefineException {
DataSourceVO dataSourceVO = new DataSourceParam2SourceVOConverter().convert(addDataSourceParam);
Expand All @@ -172,13 +174,14 @@ protected void checkParams() throws IllegalArgumentException {
String principal = (String) dataSourceJson.get("principal");
Asserts.hasText(principal, "kerberos principle不能为空!");
}

@Override
protected Long process() throws RdosDefineException {
Pair<String, String> resource = (Pair<String, String>) params.get("resource");
params.remove(RESOURCE);
DataSourceVO dataSourceVo = PublicUtil.mapToObject(params, DataSourceVO.class);
if (dataSourceVo == null ||
StringUtils.isBlank(dataSourceVo.getDataName())){
StringUtils.isBlank(dataSourceVo.getDataName())) {
throw new PubSvcDefineException("dataSource name empty");
}
params.put(RESOURCE, resource);
Expand Down Expand Up @@ -227,14 +230,14 @@ public R<Set<JSONObject>> columnForSyncopate(@RequestBody DevelopDataSourceColum
return new APITemplate<Set<JSONObject>>() {
@Override
protected void checkParams() throws IllegalArgumentException {
if(Objects.isNull(vo.getTableName())){
if (CollectionUtils.isEmpty(vo.getTableName())) {
throw new RdosDefineException("table can not be null");
}
}

@Override
protected Set<JSONObject> process() {
return datasourceService.columnForSyncopate(vo.getSourceId(), vo.getTableName(), vo.getSchema());
return datasourceService.columnForSyncopate(vo.getSourceId(), vo.getTableName().get(0), vo.getSchema());
}
}.execute();
}
Expand All @@ -256,7 +259,7 @@ public R<JSONObject> preview(@RequestBody DevelopDataSourcePreviewVO vo) {
return new APITemplate<JSONObject>() {
@Override
protected JSONObject process() {
return datasourceService.preview(vo.getSourceId(), vo.getTableName(),vo.getSchema());
return datasourceService.preview(vo.getSourceId(), vo.getTableName(), vo.getSchema());
}
}.execute();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -561,7 +561,7 @@ public PageResult<List<TaskListResultVO>> getTaskList(TaskSearchVO taskSearchVO)
}
}
//任务状态为null 设置为未提交状态
vo.setStatus(status != null ? status : TaskStatus.UNSUBMIT.getStatus());
vo.setStatus(TaskStatus.getShowStatus(status != null ? status : TaskStatus.UNSUBMIT.getStatus()));

if (CollectionUtils.isEmpty(taskSearchVO.getStatusList()) || taskSearchVO.getStatusList().contains(vo.getStatus())) {
vo.setSubmitModified(task.getGmtModified());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,24 @@
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.dtstack.taier.datasource.api.base.ClientCache;
import com.dtstack.taier.datasource.api.client.IClient;
import com.dtstack.taier.datasource.api.dto.SqlQueryDTO;
import com.dtstack.taier.datasource.api.dto.source.ISourceDTO;
import com.dtstack.taier.common.constant.CommonConstant;
import com.dtstack.taier.common.enums.Deleted;
import com.dtstack.taier.common.enums.EScheduleJobType;
import com.dtstack.taier.common.exception.RdosDefineException;
import com.dtstack.taier.dao.domain.DsInfo;
import com.dtstack.taier.dao.domain.TaskDirtyDataManage;
import com.dtstack.taier.develop.datasource.convert.load.SourceLoaderService;
import com.dtstack.taier.dao.mapper.TaskDirtyDataManageMapper;
import com.dtstack.taier.datasource.api.base.ClientCache;
import com.dtstack.taier.datasource.api.client.IClient;
import com.dtstack.taier.datasource.api.dto.SqlQueryDTO;
import com.dtstack.taier.datasource.api.dto.source.ISourceDTO;
import com.dtstack.taier.develop.datasource.convert.load.SourceLoaderService;
import com.dtstack.taier.develop.enums.develop.TaskDirtyDataManageParamEnum;
import com.dtstack.taier.develop.enums.develop.TaskDirtyOutPutTypeEnum;
import com.dtstack.taier.develop.mapstruct.vo.TaskDirtyDataManageTransfer;
import com.dtstack.taier.develop.service.datasource.impl.DatasourceService;
import com.dtstack.taier.develop.service.datasource.impl.DsInfoService;
import com.dtstack.taier.develop.vo.develop.query.TaskDirtyDataManageVO;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

Expand All @@ -38,19 +36,14 @@
* @Date: 2022/06/14/2:52 PM
*/
@Service
public class TaskDirtyDataManageService extends ServiceImpl<TaskDirtyDataManageMapper, TaskDirtyDataManage> implements TaskDirtyDataManageIService<TaskDirtyDataManage>{

private static final Logger logger = LoggerFactory.getLogger(TaskDirtyDataManageService.class);
public class TaskDirtyDataManageService extends ServiceImpl<TaskDirtyDataManageMapper, TaskDirtyDataManage> implements TaskDirtyDataManageIService<TaskDirtyDataManage> {

@Autowired
private TaskDirtyDataManageIService taskDirtyDataIService;

@Autowired
private DsInfoService dsInfoService;

@Autowired
private DatasourceService datasourceService;

@Autowired
private SourceLoaderService sourceLoaderService;

Expand All @@ -76,8 +69,8 @@ public TaskDirtyDataManage getOneByTaskId(Long taskId) {
* 添加或修改任务脏数据管理
*
* @param vo
* @param tenantId 租户 id
* @param taskId 任务 id
* @param tenantId 租户 id
* @param taskId 任务 id
*/
public void addOrUpdateDirtyDataManage(TaskDirtyDataManageVO vo, Long tenantId, Long taskId) {
// 先删除原有的脏数据管理
Expand All @@ -87,7 +80,7 @@ public void addOrUpdateDirtyDataManage(TaskDirtyDataManageVO vo, Long tenantId,
taskDirtyDataManage.setTenantId(tenantId);
taskDirtyDataManage.setGmtCreate(new Timestamp(System.currentTimeMillis()));
taskDirtyDataManage.setGmtModified(new Timestamp(System.currentTimeMillis()));
if(Objects.equals(TaskDirtyOutPutTypeEnum.LOG.getValue(),taskDirtyDataManage.getOutputType())){
if (Objects.equals(TaskDirtyOutPutTypeEnum.LOG.getValue(), taskDirtyDataManage.getOutputType())) {
taskDirtyDataManage.setLinkInfo("{}");
}
taskDirtyDataIService.save(taskDirtyDataManage);
Expand All @@ -103,15 +96,18 @@ public void buildTaskDirtyDataManageArgs(Integer taskType, Long taskId, JSONObje
confProp.put(TaskDirtyDataManageParamEnum.MAX_ROWS.getParam(), byTaskId.getMaxRows());
confProp.put(TaskDirtyDataManageParamEnum.MAX_COLLECT_FAILED_ROWS.getParam(), byTaskId.getMaxCollectFailedRows());
confProp.put(TaskDirtyDataManageParamEnum.LOG_PRINT_INTERVAL.getParam(), byTaskId.getLogPrintInterval());

if (Objects.equals(byTaskId.getOutputType(), "jdbc")) {
JSONObject dirtyDataJSON = JSONObject.parseObject(byTaskId.getLinkInfo());
Long srcId = Long.parseLong(dirtyDataJSON.getString("sourceId"));
confProp.put(CommonConstant.DATASOURCE_ID, srcId);
DsInfo dsInfo = dsInfoService.getOneById(srcId);
JSONObject dataJson = JSON.parseObject(dsInfo.getDataJson());
confProp.put(TaskDirtyDataManageParamEnum.URL.getParam(), dataJson.getString("jdbcUrl"));
confProp.put(TaskDirtyDataManageParamEnum.USERNAME.getParam(), dataJson.getString("username"));
confProp.put(TaskDirtyDataManageParamEnum.PASSWORD.getParam(), dataJson.getString("password"));
String table = dirtyDataJSON.getString(TaskDirtyDataManageParamEnum.TABLE.name().toLowerCase());
confProp.put(CommonConstant.DATASOURCE_TYPE, dsInfo.getDataTypeCode());
if (StringUtils.isNotBlank(table)) {
confProp.put(TaskDirtyDataManageParamEnum.TABLE.getParam(), table);
} else {
Expand Down Expand Up @@ -156,14 +152,7 @@ public void createTable(Long sourceId) {
*/
private boolean checkDirtyTableExist(ISourceDTO sourceDTO) {
IClient client = ClientCache.getClient(sourceDTO.getSourceType());
try {
client.getTable(sourceDTO, SqlQueryDTO.builder().tableName(TaskDirtyDataManageParamEnum.TABLE.getDefaultValue()).build());
} catch (Exception e) {
if (e.getMessage().contains("doesn't exist")) {
return false;
}
throw new RdosDefineException("创建脏数据表失败", e);
}
return true;
String currentDatabase = client.getCurrentDatabase(sourceDTO);
return client.isTableExistsInDatabase(sourceDTO, TaskDirtyDataManageParamEnum.TABLE.getDefaultValue(), currentDatabase);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;

import java.util.List;

/**
* 返回切分键需要的列名
*
Expand All @@ -34,11 +36,8 @@ public class DevelopDataSourceColumnForSyncopateVO extends DtInsightAuthParam {
@ApiModelProperty(value = "数据源id", example = "1", required = true)
private Long sourceId;

// @ApiModelProperty(value = "表名称", required = true)
// private List<String> tableName;

@ApiModelProperty(value = "表名称", required = true)
private String tableName;
private List<String> tableName;

@ApiModelProperty(value = "查询的schema", example = "test")
private String schema;
Expand All @@ -54,13 +53,13 @@ public void setSourceId(Long sourceId) {
this.sourceId = sourceId;
}

// public List<String> getTableName() {
// return tableName;
// }
//
// public void setTableName(List<String> tableName) {
// this.tableName = tableName;
// }
public List<String> getTableName() {
return tableName;
}

public void setTableName(List<String> tableName) {
this.tableName = tableName;
}

public String getSchema() {
return schema;
Expand All @@ -79,12 +78,4 @@ public Long getUserId() {
public void setUserId(Long userId) {
this.userId = userId;
}

public String getTableName() {
return tableName;
}

public void setTableName(String tableName) {
this.tableName = tableName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.alibaba.fastjson.JSONObject;
import com.dtstack.taier.common.BlockCallerPolicy;
import com.dtstack.taier.common.constant.CommonConstant;
import com.dtstack.taier.common.enums.EJobCacheStage;
import com.dtstack.taier.common.enums.EJobClientType;
import com.dtstack.taier.common.enums.EScheduleJobType;
Expand All @@ -47,7 +46,6 @@
import com.dtstack.taier.scheduler.server.JobPartitioner;
import com.dtstack.taier.scheduler.server.queue.GroupInfo;
import com.dtstack.taier.scheduler.server.queue.GroupPriorityQueue;
import com.dtstack.taier.scheduler.service.ComponentService;
import com.dtstack.taier.scheduler.service.ScheduleJobCacheService;
import com.dtstack.taier.scheduler.service.ScheduleJobExpandService;
import org.apache.commons.lang3.StringUtils;
Expand All @@ -63,8 +61,6 @@
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* company: www.dtstack.com
Expand Down Expand Up @@ -159,7 +155,7 @@ public void run() {
if (simpleJobDelay != null && jobClient != null) {
LOGGER.error("jobId:{} stage:{}", jobClient.getJobId(), simpleJobDelay.getStage(), e);
} else {
LOGGER.error("", e);
LOGGER.error("restartJob take error", e);
}
}
}
Expand Down Expand Up @@ -196,8 +192,9 @@ public int getDelayJobQueueSize() {
@Override
public void run() {
while (true) {
JobClient jobClient = null;
try {
JobClient jobClient = queue.take();
jobClient = queue.take();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("jobId:{} jobResource:{} queue size:{} take job from priorityQueue.", jobClient.getJobId(), jobResource, queue.size());
}
Expand All @@ -219,9 +216,15 @@ public void run() {
}

//提交任务
jobSubmitConcurrentService.submit(() -> submitJob(jobClient));
JobClient finalJobClient = jobClient;
jobSubmitConcurrentService.submit(() -> submitJob(finalJobClient));
} catch (Exception e) {
LOGGER.error("", e);
if (null != jobClient) {
LOGGER.error("jobId {} submit error", jobClient.getJobId(), e);
handlerFailedWithRetry(jobClient, false, e);
} else {
LOGGER.error("submit error", e);
}
}
}
}
Expand Down
Loading