Skip to content

Commit

Permalink
feature: supported ftp source reader and target (#915)
Browse files Browse the repository at this point in the history
- suppoorted ftp source reader and target

### Related PRs
Please make sure acesss this PR with #911
  • Loading branch information
Jixiangup authored Dec 8, 2022
1 parent 5da5aa3 commit da7d303
Show file tree
Hide file tree
Showing 31 changed files with 1,648 additions and 22 deletions.
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
<httpclient.version>4.5.2</httpclient.version>
<mybatis-plus.version>3.4.0</mybatis-plus.version>
<hive.version>2.1.0</hive.version>
<poi.version>5.0.0</poi.version>
<taier.datasource.version>1.0.0</taier.datasource.version>
<frontend-maven-plugin.version>1.12.1</frontend-maven-plugin.version>
<maven-antrun-plugin.version>3.1.0</maven-antrun-plugin.version>
Expand Down Expand Up @@ -118,6 +119,20 @@
<scope>test</scope>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.poi/poi -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>${poi.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.poi/poi-ooxml -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>${poi.version}</version>
</dependency>

<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
Expand Down
4 changes: 4 additions & 0 deletions sql/1.3.1/increment.sql

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion sql/init.sql

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.taier.common.enums;

import com.dtstack.taier.common.exception.TaierDefineException;

import java.util.List;
import java.util.Locale;
import java.util.stream.Collectors;
import java.util.stream.Stream;

/**
*
* @since 1.3.1
*/
public enum EFTPTaskFileType {
EXCEL(Stream.of(
".xls",
".xlsx"
).collect(Collectors.toList()), "EXCEL"),

CSV(Stream.of(
".csv"
).collect(Collectors.toList()), "CSV"),

TXT(Stream.of(
".txt"
).collect(Collectors.toList()), "TXT"),
;

private final List<String> filetypes;

private final String taskFiletype;

EFTPTaskFileType(List<String> filetypes, String taskFiletype) {
this.filetypes = filetypes;
this.taskFiletype = taskFiletype;
}

public List<String> getFiletypes() {
return filetypes;
}

public String getTaskFiletype() {
return taskFiletype;
}

public static EFTPTaskFileType filetype(String filetype) {
filetype = filetype.toLowerCase(Locale.ROOT);
EFTPTaskFileType[] values = values();
for (EFTPTaskFileType value : values) {
if (value.filetypes.contains(filetype)) {
return value;
}
}
throw new TaierDefineException("unsupported ftp task file format");
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.dtstack.taier.common.exception.DtCenterDefException;
import com.dtstack.taier.common.exception.ErrorCode;
import com.dtstack.taier.common.exception.TaierDefineException;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -48,4 +49,24 @@ public static void mkdirsIfNotExist(String directoryPath) {
}
}
}

public static String getFilename(String filepath) {
if (StringUtils.isEmpty(filepath)) {
throw new TaierDefineException("filepath cannot be empty");
}
int lastIndexOf = filepath.lastIndexOf(File.separator);
if (lastIndexOf == filepath.length() - 1 || lastIndexOf == -1) {
throw new TaierDefineException("file does not exist");
}
return filepath.substring(lastIndexOf);
}

public static String getFiletype(String filepath) {
String filename = getFilename(filepath);
int lastIndexOf = filename.lastIndexOf(".");
if (lastIndexOf == filename.length() -1 || lastIndexOf == -1) {
throw new TaierDefineException("file type is undefined");
}
return filename.substring(lastIndexOf + 1);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import com.alibaba.fastjson.JSON;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang3.StringUtils;

import java.util.HashMap;
import java.util.Map;
Expand All @@ -43,7 +42,7 @@ public class MapUtil {
*/
public static void buildMap(String key, String fieldDelimiter, Object value, Map<String, Object> data) {
String[] split = new String[1];
if (StringUtils.isBlank(fieldDelimiter)) {
if (org.apache.commons.lang3.StringUtils.isBlank(fieldDelimiter)) {
split[0] = key;
} else {
split = key.split(StringUtil.escapeExprSpecialWord(fieldDelimiter));
Expand Down Expand Up @@ -86,7 +85,7 @@ public static Object getValueByKey(Map<String, Object> map, String key, String f
}
Object o = null;
String[] split = new String[1];
if (StringUtils.isBlank(fieldDelimiter)) {
if (org.apache.commons.lang3.StringUtils.isBlank(fieldDelimiter)) {
split[0] = key;
} else {
split = key.split(StringUtil.escapeExprSpecialWord(fieldDelimiter));
Expand Down Expand Up @@ -138,7 +137,7 @@ public static void putIfValueNotNull(Map<String, Object> params, String key, Obj
* @param value value
*/
public static void putIfValueNotEmpty(Map<String, Object> params, String key, String value) {
if (Objects.nonNull(params) && StringUtils.isNotEmpty(value)) {
if (Objects.nonNull(params) && org.apache.commons.lang3.StringUtils.isNotEmpty(value)) {
params.put(key, value);
}
}
Expand All @@ -151,7 +150,7 @@ public static void putIfValueNotEmpty(Map<String, Object> params, String key, St
* @param value value
*/
public static void putIfValueNotBlank(Map<String, String> params, String key, String value) {
if (Objects.nonNull(params) && StringUtils.isNotBlank(value)) {
if (Objects.nonNull(params) && org.apache.commons.lang3.StringUtils.isNotBlank(value)) {
params.put(key, value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
* 正则表达式校验工具
* Regular expression check utils
*
* @author bnyte
*
* @date 2022/5/2 16:03
*/
public class RegexUtils {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@

import org.apache.commons.lang3.StringUtils;

public class StringUtil {
public abstract class StringUtil {

/**
* 转义正则特殊字符 ($()*+.[]?\^{},|)
Expand Down
11 changes: 11 additions & 0 deletions taier-data-develop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,17 @@
</exclusions>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.poi/poi -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.poi/poi-ooxml -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
</dependency>

<dependency>
<groupId>net.minidev</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,18 @@ public interface Writer extends CheckFormat {
JSONObject toWriterJson();

String toWriterJsonString();

/**
* Whether the current write data source needs to rewrite the write mode,
* and regenerate the required mode through the replace and insert passed in from the front end
* For example: the front end of the FTP writer passes in replace and insert,
* but chunjun needs overwrite and append, so it needs to be rewritten
* default is not rewritten
*
* @return If you return True, you need to rewrite the write Mode,
* and if you return False, you don't need to rewrite the write Mode
*/
default boolean resetWriteMode() {
return false;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,18 @@
import com.dtstack.taier.develop.vo.develop.query.DevelopTaskGetComponentVersionVO;
import com.dtstack.taier.develop.vo.develop.query.DevelopTaskGetSupportJobTypesVO;
import com.dtstack.taier.develop.vo.develop.query.DevelopTaskNameCheckVO;
import com.dtstack.taier.develop.vo.develop.query.DevelopTaskParsingFTPFileParamVO;
import com.dtstack.taier.develop.vo.develop.query.DevelopTaskPublishTaskVO;
import com.dtstack.taier.develop.vo.develop.query.DevelopTaskResourceParamVO;
import com.dtstack.taier.develop.vo.develop.result.DevelopAllProductGlobalReturnVO;
import com.dtstack.taier.develop.vo.develop.result.DevelopGetChildTasksResultVO;
import com.dtstack.taier.develop.vo.develop.result.DevelopSysParameterResultVO;
import com.dtstack.taier.develop.vo.develop.result.DevelopTaskGetComponentVersionResultVO;
import com.dtstack.taier.develop.vo.develop.result.DevelopTaskTypeVO;
import com.dtstack.taier.develop.vo.develop.result.DevelopTaskGetTaskByIdResultVO;
import com.dtstack.taier.develop.vo.develop.result.DevelopTaskPublishTaskResultVO;
import com.dtstack.taier.develop.vo.develop.result.DevelopTaskResultVO;
import com.dtstack.taier.develop.vo.develop.result.DevelopTaskTypeVO;
import com.dtstack.taier.develop.vo.develop.result.ParsingFTPFileVO;
import com.dtstack.taier.develop.vo.develop.result.TaskCatalogueResultVO;
import com.dtstack.taier.scheduler.service.ScheduleTaskShadeService;
import com.google.common.base.Preconditions;
Expand All @@ -64,8 +66,10 @@
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@Api(value = "任务管理", tags = {"任务管理"})
Expand Down Expand Up @@ -311,6 +315,10 @@ public R<JSONObject> getSyncProperties() {
return R.ok(developTaskService.getSyncProperties());
}


@PostMapping(value = "/parsing_ftp_columns")
@ApiOperation("数据开发-解析ftp任务字段列表")
public R<ParsingFTPFileVO> parsingFtpTaskFile(@RequestBody DevelopTaskParsingFTPFileParamVO payload) throws IOException {
return R.ok(developTaskService.parsingFtpTaskFile(payload));
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.dtstack.taier.develop.enums.develop;

import com.dtstack.taier.common.enums.DataSourceTypeEnum;
import com.dtstack.taier.common.exception.TaierDefineException;

/**
*
* @since 1.0.0
*/
public enum EWriterMode {

FTP(DataSourceTypeEnum.FTP.getVal()) {
@Override
public String rewriterWriterMode(String frontendParam) {
if ("replace".equals(frontendParam)) {
return "overwrite";
} else if ("insert".equals(frontendParam)){
return "append";
}
throw new TaierDefineException("writer mode not found on the " + frontendParam + ", maybe you can try replace or insert");
}
},
;

public abstract String rewriterWriterMode(String frontendParam);

public static EWriterMode sourceType(Integer typeCode) {
for (EWriterMode value : values()) {
if (value.sourceType.equals(typeCode)) {
return value;
}
}
throw new TaierDefineException("unsupported data source type");
}

/**
* data source type var
* @see DataSourceTypeEnum
*/
private final Integer sourceType;

EWriterMode(Integer sourceType) {
this.sourceType = sourceType;
}

public Integer getSourceType() {
return sourceType;
}

}
Loading

0 comments on commit da7d303

Please sign in to comment.