Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][ConnectorV2]add file excel sink #2585

Closed
wants to merge 34 commits into from
Closed

[Feature][ConnectorV2]add file excel sink #2585

wants to merge 34 commits into from

Conversation

Bingz2
Copy link
Contributor

@Bingz2 Bingz2 commented Aug 31, 2022

Purpose of this pull request

add [File]excel sink
#1946

Check list

@CalvinKirs
Copy link
Member

@TyrantLucifer Hi, PTAL

@Test
public void testFakeSourceToHdfsFileExcel() throws IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelSparkJob("/file/fakesource_to_hdfs_excel.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validate sink data row & datatypes?

Comment on lines 33 to 34
name = "string"
age = "int"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test all datatypes?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test string,boolean,tinyint,smallint,int,bigint,float,double,null is ok

@Test
public void testFakeSourceToHdfsFileExcel() throws IOException, InterruptedException {
Container.ExecResult execResult = executeSeaTunnelFlinkJob("/file/fakesource_to_hdfs_excel.conf");
Assertions.assertEquals(0, execResult.getExitCode());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

uncommited fakesource_to_local_excel.conffakesource_to_hdfs_excel.conf?

}

public void writeData(SeaTunnelRow seaTunnelRow) {
Sheet st = wb.getSheet("Sheet1");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about treated st as a class attribute, when a SeaTunnelRow needs be consumed createSheet will be invoked. Of course, I have no detailed source code understanding, in each write whether to obtain a sheet object needs a double check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh ,Let me modify that

@TyrantLucifer
Copy link
Member

image

You e2e test cases also have some problems, please check.

@Bingz2
Copy link
Contributor Author

Bingz2 commented Sep 2, 2022

image

You e2e test cases also have some problems, please check.

image
If I add this code, Excel sink Write's finishAndCloseWriteFile method will be called twice when executed with Flink, resulting in an error!

However, if you remove this code and run it with Spark, Text sink Write is executed instead of Excel sink Write

@TyrantLucifer
Copy link
Member

image
You e2e test cases also have some problems, please check.

image If I add this code, Excel sink Write's finishAndCloseWriteFile method will be called twice when executed with Flink, resulting in an error!

However, if you remove this code and run it with Spark, Text sink Write is executed instead of Excel sink Write

Can you provide the detailed call stack information?

@Bingz2
Copy link
Contributor Author

Bingz2 commented Sep 3, 2022

image
You e2e test cases also have some problems, please check.

image If I add this code, Excel sink Write's finishAndCloseWriteFile method will be called twice when executed with Flink, resulting in an error!
However, if you remove this code and run it with Spark, Text sink Write is executed instead of Excel sink Write

Can you provide the detailed call stack information?

image
image

SparkDataWriter's commit() method clears commitInfo after execution, However, the prepareCommit of FlinkSinkWrite does not clear commitInfo, so it writes ExcelWorkBook when it calls the close() method of FlinkSlinkWrite, but it does the WorkBook when it executes prepareCommit It has been closed.
I don't understand why SparkDataWriter and FlinkSinkWrite have different commit logic. Do I need to determine whether the workbook is close in finishAndCloseWriteFile?

@Hisoka-X
Copy link
Member

Hisoka-X commented Sep 5, 2022

image
You e2e test cases also have some problems, please check.

image If I add this code, Excel sink Write's finishAndCloseWriteFile method will be called twice when executed with Flink, resulting in an error!
However, if you remove this code and run it with Spark, Text sink Write is executed instead of Excel sink Write

Can you provide the detailed call stack information?

image image

SparkDataWriter's commit() method clears commitInfo after execution, However, the prepareCommit of FlinkSinkWrite does not clear commitInfo, so it writes ExcelWorkBook when it calls the close() method of FlinkSlinkWrite, but it does the WorkBook when it executes prepareCommit It has been closed. I don't understand why SparkDataWriter and FlinkSinkWrite have different commit logic. Do I need to determine whether the workbook is close in finishAndCloseWriteFile?

The reason already in comment: combine the prepareCommit and commit in this method.. Flink Support Committer and GlobalCommitter, but spark only support GlobalCommitter(same logic different name). So spark use commit() to run prepareCommit() and Committer.commit().

@Hisoka-X Hisoka-X added the Waiting for users feedback Waiting for feedback from issue/PR author label Sep 5, 2022
@Bingz2
Copy link
Contributor Author

Bingz2 commented Sep 5, 2022

image
You e2e test cases also have some problems, please check.

image If I add this code, Excel sink Write's finishAndCloseWriteFile method will be called twice when executed with Flink, resulting in an error!
However, if you remove this code and run it with Spark, Text sink Write is executed instead of Excel sink Write

Can you provide the detailed call stack information?

image image
SparkDataWriter's commit() method clears commitInfo after execution, However, the prepareCommit of FlinkSinkWrite does not clear commitInfo, so it writes ExcelWorkBook when it calls the close() method of FlinkSlinkWrite, but it does the WorkBook when it executes prepareCommit It has been closed. I don't understand why SparkDataWriter and FlinkSinkWrite have different commit logic. Do I need to determine whether the workbook is close in finishAndCloseWriteFile?

The reason already in comment: combine the prepareCommit and commit in this method.. Flink Support Committer and GlobalCommitter, but spark only support GlobalCommitter(same logic different name). So spark use commit() to run prepareCommit() and Committer.commit().

ok,My local E2E and compile passed, but CI/CD failed and I didn't use ES.
image

pom.xml Outdated
@@ -758,6 +760,19 @@
<artifactId>aliyun-sdk-datahub</artifactId>
<version>${datahub.version}</version>
</dependency>

<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, please move connector dependency to connector's pom.xml. Reference: #2630

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, they have been moved to connector's pom.xml

cell.setCellValue((int) value);
cell.setCellStyle(wholeNumberCellStyle);
} else if (BasicType.LONG_TYPE.equals(type) || type.getSqlType().equals(SqlType.TIMESTAMP)) {
cell.setCellValue((long) value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value type is LocalDateTime?

cell.setCellValue(JsonUtils.toJsonString(value));
cell.setCellStyle(stringCellStyle);
} else if (type.getSqlType().equals(SqlType.DATE) || type.getSqlType().equals(SqlType.TIME)) {
cell.setCellValue((String) value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value type is LocalTime or LocalDate?

} else if (BasicType.DOUBLE_TYPE.equals(type)) {
cell.setCellValue((double) value);
cell.setCellStyle(wholeNumberCellStyle);
} else if (type.getSqlType().equals(SqlType.BYTES) || type.getSqlType().equals(SqlType.ARRAY)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ARRAY should use json?

@@ -0,0 +1,82 @@
#
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -0,0 +1,82 @@
#
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

public ExcelGenerator(List<Integer> sinkColumnsIndexInRow, SeaTunnelRowType seaTunnelRowType) {
this.sinkColumnsIndexInRow = sinkColumnsIndexInRow;
this.seaTunnelRowType = seaTunnelRowType;
wb = new XSSFWorkbook();
Copy link
Member

@hailin0 hailin0 Sep 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Bingz2
Copy link
Contributor Author

Bingz2 commented Oct 30, 2022

Hi, @Bingz2 thanks for your contribution, what's the news about this pr? Do you need some help?
Sorry, I'm a little busy recently. I'll finish it this weekend

Thank you very much.

image
An error in the CI,but I executed E2E locally with no errors,I don't know why.

@EricJoy2048 EricJoy2048 reopened this Oct 31, 2022
@EricJoy2048
Copy link
Member

Hi, @Bingz2 thanks for your contribution, what's the news about this pr? Do you need some help?
Sorry, I'm a little busy recently. I'll finish it this weekend

Thank you very much.

image An error in the CI,but I executed E2E locally with no errors,I don't know why.

Don't worry, just resolve conflicts and retry the CI, Thanks.

@EricJoy2048 EricJoy2048 closed this Nov 3, 2022
@EricJoy2048 EricJoy2048 reopened this Nov 3, 2022
@EricJoy2048 EricJoy2048 closed this Nov 4, 2022
@EricJoy2048 EricJoy2048 reopened this Nov 4, 2022
@EricJoy2048 EricJoy2048 closed this Nov 4, 2022
@EricJoy2048 EricJoy2048 reopened this Nov 4, 2022
@@ -86,6 +87,10 @@ The separator between columns in a row of data. Only needed by `text` and `csv`

The separator between rows in a file. Only needed by `text` and `csv` file format.

### max_rows_in_memory [int]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add max_rows_in_memory to options

image

if (textFileSinkConfig.getMaxRowsInMemory() > 0) {
wb = new SXSSFWorkbook(textFileSinkConfig.getMaxRowsInMemory());
} else {
wb = new XSSFWorkbook();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MaxRowsInMemory default value is Long.MAX_VALUE ?

You can choose from the following:

  1. It is stated in the document that if MaxRowsInMemory is not set, all data will be buffered to memory
  2. Set default value xxx for MaxRowsInMemory

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok, I'm going to set a default

@@ -0,0 +1,70 @@
#
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -0,0 +1,69 @@
#
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines +34 to +35
name = "string"
age = "int"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test all data type write to excel file

Suggested change
name = "string"
age = "int"
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp

Comment on lines +34 to +35
name = "string"
age = "int"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test all data type write to excel file

Suggested change
name = "string"
age = "int"
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp
c_row = {
c_map = "map<string, string>"
c_array = "array<int>"
c_string = string
c_boolean = boolean
c_tinyint = tinyint
c_smallint = smallint
c_int = int
c_bigint = bigint
c_float = float
c_double = double
c_bytes = bytes
c_date = date
c_decimal = "decimal(38, 18)"
c_timestamp = timestamp

is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format="excel"
sink_columns=["name","age"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Suggested change
sink_columns=["name","age"]

Comment on lines +44 to +51
transform {
sql {
sql = "select name,age from fake"
}

# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Suggested change
transform {
sql {
sql = "select name,age from fake"
}
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/flink/configuration/transform-plugins/Sql
}

is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format="excel"
sink_columns=["name","age"]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Suggested change
sink_columns=["name","age"]

Comment on lines +44 to +51
transform {
sql {
sql = "select name,age from fake"
}

# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove

Suggested change
transform {
sql {
sql = "select name,age from fake"
}
# If you would like to get more information about how to configure seatunnel and see full list of transform plugins,
# please go to https://seatunnel.apache.org/docs/transform/sql
}

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

hi, I need to install fonts to use SXSSFWorkbook, how do I install them in the test container? For example, the following commandRUN apk add --update font-adobe-100dpi ttf-dejavu fontconfig

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe you need to write dockerfile to do that.Add the required content based on this image, then generate your own image and push it to docker hub.

@MonsterChenzhuo
Copy link
Contributor

@Bingz2 Will this pr go ahead? If not, I would like to take over and finish it

@Bingz2
Copy link
Contributor Author

Bingz2 commented Jan 18, 2023

@Bingz2 Will this pr go ahead? If not, I would like to take over and finish it

ok tks

@Bingz2 Bingz2 closed this Jan 18, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
connectors-v2 Waiting for users feedback Waiting for feedback from issue/PR author
Projects
Status: Done
Development

Successfully merging this pull request may close these issues.

8 participants