Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature: supported ftp source reader and target #915

Merged
merged 12 commits into from
Dec 8, 2022
Merged

feature: supported ftp source reader and target #915

merged 12 commits into from
Dec 8, 2022

Conversation

Jixiangup
Copy link
Collaborator

@Jixiangup Jixiangup commented Nov 24, 2022

  • suppoorted ftp source reader and target

Related PRs

Please make sure acesss this PR with #911

@Jixiangup
Copy link
Collaborator Author

check this

@FlechazoW
Copy link
Member

Why do you base on branch DTstack:feat/ftp rather than branch master?

@Jixiangup Jixiangup changed the base branch from feat/ftp to master November 24, 2022 05:46
@Jixiangup
Copy link
Collaborator Author

Jixiangup commented Nov 24, 2022

Why do you base on branch DTstack:feat/ftp rather than branch master?

because the front end wants to submit a pr to merge it, i changed it

@Jixiangup Jixiangup linked an issue Nov 24, 2022 that may be closed by this pull request
3 tasks
@Jixiangup Jixiangup added feature New feature v1.3.1 labels Nov 24, 2022
sql/1.3.1/increment.sql Outdated Show resolved Hide resolved
@Jixiangup Jixiangup requested a review from vainhope November 25, 2022 05:46
Comment on lines 32 to 33
import com.dtstack.taier.develop.vo.develop.query.*;
import com.dtstack.taier.develop.vo.develop.result.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not import *.

Comment on lines +83 to +93
<!-- https://mvnrepository.com/artifact/org.apache.poi/poi -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.poi/poi-ooxml -->
<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
</dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of depending on apache.poi?

@Jixiangup
Copy link
Collaborator Author

Jixiangup commented Nov 25, 2022

  • 统一ConnectionUtil, StringUtil
  • 去除author
  • 去除import *

import com.dtstack.taier.common.enums.EScheduleJobType;
import com.dtstack.taier.common.enums.EScheduleStatus;
import com.dtstack.taier.common.enums.ResourceRefType;
import com.dtstack.taier.common.enums.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not import *.

import org.apache.commons.collections.CollectionUtils;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.poi.ss.usermodel.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not import *.

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

import java.io.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not import *.

import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not import *.

Comment on lines 1464 to 1498
private List<FTPColumn> getTxtColumns(DevelopTaskParsingFTPFileParamVO payload, InputStream fileInputStream) {
List<FTPColumn> columns = new ArrayList<>();
try {
// sftp特性,当你关闭ChannelSftp时随即你获取到的流对象也会被关闭所以这里要爆漏channelSftp获取流对象
// The sftp feature, when you close the ChannelSftp, the stream object you get will also be closed, so here we need to leak the ChannelSftp to get the stream object
BufferedReader bis = new BufferedReader(new InputStreamReader(fileInputStream, payload.getEncoding()));
// memory to store buffered stream per read
int limit = 1;
String line = null;
while ((line = bis.readLine()) != null && limit <= 1) {
String[] split = line.split(payload.getColumnSeparator());
if (payload.getFirstColumnName() && limit == 1) {
for (int i = 0; i < split.length; i++) {
FTPColumn ftpColumn = new FTPColumn();
ftpColumn.setName(split[i]);
ftpColumn.setType("string");
ftpColumn.setIndex(i);
columns.add(ftpColumn);
}
} else if (!payload.getFirstColumnName() && limit == 1) {
for (int i = 0; i < split.length; i++) {
FTPColumn ftpColumn = new FTPColumn();
ftpColumn.setName("column" + i);
ftpColumn.setType("string");
ftpColumn.setIndex(i);
columns.add(ftpColumn);
}
}
limit++;
}
} catch (IOException e) {
throw new RuntimeException(e);
}
return columns;
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unclosed stream. Check the code, please.

import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.*;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do not import *.

@vainhope
Copy link
Collaborator

init sql lost sql change

@Jixiangup
Copy link
Collaborator Author

init sql lost sql change

check it

@Jixiangup
Copy link
Collaborator Author

Jixiangup commented Nov 28, 2022

  • fixed all import *
  • delete my CollectionUtils and StringUtils
  • uniform use apache long 3's StringUtils and MapUtils
  • delete unused classes
  • delete new author mark

@FlechazoW
Copy link
Member

CC. @vainhope @jiemotongxue

@vainhope
Copy link
Collaborator

vainhope commented Dec 6, 2022

ftp 手动映射字段

column为空

2022-12-06 20:16:25:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Field names must be unique. Duplicate field: 'null'
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Field names must be unique. Duplicate field: 'null'
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:371)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:224)
at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82)
at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:117)
at com.dtstack.taier.flink.FlinkClient.submitJobWithJar(FlinkClient.java:395)
at com.dtstack.taier.flink.FlinkClient.submitSyncJob(FlinkClient.java:337)
at com.dtstack.taier.flink.FlinkClient.lambda$processSubmitJobWithType$1(FlinkClient.java:265)
at com.dtstack.taier.base.util.KerberosUtils.login(KerberosUtils.java:140)
at com.dtstack.taier.base.util.KerberosUtils.login(KerberosUtils.java:238)
at com.dtstack.taier.flink.FlinkClient.processSubmitJobWithType(FlinkClient.java:252)
at com.dtstack.taier.pluginapi.client.AbstractClient.submitJob(AbstractClient.java:59)
at com.dtstack.taier.common.client.ClientProxy$2.execute(ClientProxy.java:103)
at com.dtstack.taier.common.client.ClientProxy$2.execute(ClientProxy.java:99)
at com.dtstack.taier.pluginapi.callback.ClassLoaderCallBackMethod.callbackAndReset(ClassLoaderCallBackMethod.java:31)
at com.dtstack.taier.common.client.ClientProxy.lambda$submitJob$1(ClientProxy.java:99)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.ValidationException: Field names must be unique. Duplicate field: 'null'
at org.apache.flink.table.api.TableSchema.validateAndCreateNameToTypeMapping(TableSchema.java:501)
at org.apache.flink.table.api.TableSchema.validateColumnsAndWatermarkSpecs(TableSchema.java:412)
at org.apache.flink.table.api.TableSchema.access$100(TableSchema.java:59)
at org.apache.flink.table.api.TableSchema$Builder.build(TableSchema.java:695)
at com.dtstack.chunjun.util.TableUtil.createTableSchema(TableUtil.java:188)
at com.dtstack.chunjun.util.TableUtil.createRowType(TableUtil.java:170)
at com.dtstack.chunjun.connector.ftp.source.FtpSourceFactory.createSource(FtpSourceFactory.java:82)
at com.dtstack.chunjun.Main.exeSyncJob(Main.java:186)
at com.dtstack.chunjun.Main.main(Main.java:122)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:354)
... 19 more

image

@Jixiangup
Copy link
Collaborator Author

ftp 手动映射字段

column为空

2022-12-06 20:16:25:org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Field names must be unique. Duplicate field: 'null' org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Field names must be unique. Duplicate field: 'null' at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:371) at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:224) at org.apache.flink.client.program.PackagedProgramUtils.getPipelineFromProgram(PackagedProgramUtils.java:158) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:82) at org.apache.flink.client.program.PackagedProgramUtils.createJobGraph(PackagedProgramUtils.java:117) at com.dtstack.taier.flink.FlinkClient.submitJobWithJar(FlinkClient.java:395) at com.dtstack.taier.flink.FlinkClient.submitSyncJob(FlinkClient.java:337) at com.dtstack.taier.flink.FlinkClient.lambda$processSubmitJobWithType$1(FlinkClient.java:265) at com.dtstack.taier.base.util.KerberosUtils.login(KerberosUtils.java:140) at com.dtstack.taier.base.util.KerberosUtils.login(KerberosUtils.java:238) at com.dtstack.taier.flink.FlinkClient.processSubmitJobWithType(FlinkClient.java:252) at com.dtstack.taier.pluginapi.client.AbstractClient.submitJob(AbstractClient.java:59) at com.dtstack.taier.common.client.ClientProxy$2.execute(ClientProxy.java:103) at com.dtstack.taier.common.client.ClientProxy$2.execute(ClientProxy.java:99) at com.dtstack.taier.pluginapi.callback.ClassLoaderCallBackMethod.callbackAndReset(ClassLoaderCallBackMethod.java:31) at com.dtstack.taier.common.client.ClientProxy.lambda$submitJob$1(ClientProxy.java:99) at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: org.apache.flink.table.api.ValidationException: Field names must be unique. Duplicate field: 'null' at org.apache.flink.table.api.TableSchema.validateAndCreateNameToTypeMapping(TableSchema.java:501) at org.apache.flink.table.api.TableSchema.validateColumnsAndWatermarkSpecs(TableSchema.java:412) at org.apache.flink.table.api.TableSchema.access$100(TableSchema.java:59) at org.apache.flink.table.api.TableSchema$Builder.build(TableSchema.java:695) at com.dtstack.chunjun.util.TableUtil.createTableSchema(TableUtil.java:188) at com.dtstack.chunjun.util.TableUtil.createRowType(TableUtil.java:170) at com.dtstack.chunjun.connector.ftp.source.FtpSourceFactory.createSource(FtpSourceFactory.java:82) at com.dtstack.chunjun.Main.exeSyncJob(Main.java:186) at com.dtstack.chunjun.Main.main(Main.java:122) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:354) ... 19 more

image

我有时间会确认它!

*/
public abstract class CollectionUtils {

public static boolean isEmpty(Collection<?> collection) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

commonUtils

@Override
public TaskResourceParam beforeProcessing(TaskResourceParam taskResourceParam) {
Map<String, Object> sourceMap = taskResourceParam.getSourceMap();
assertSourceParam(sourceMap);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when create new task, path must be null

@vainhope vainhope changed the title feature: suppoorted ftp source reader and target feature: supported ftp source reader and target Dec 8, 2022
Copy link
Collaborator

@mortalYoung mortalYoung left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@vainhope vainhope merged commit da7d303 into DTStack:master Dec 8, 2022
vainhope pushed a commit that referenced this pull request Dec 8, 2022
### 简介
- 数据同步任务支持 FTP 的输入和输出端


### Related PRs and Issues
#897 
Please make sure acesss this PR with #915
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature]FTP source and target supported on sync task type
5 participants