Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][ConnectorV2]add file excel sink #2585

Closed
wants to merge 34 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
049652d
add file excel sink
Aug 31, 2022
e525451
Merge branch 'apache:dev' into dev
Bingz2 Sep 1, 2022
89e2b4f
add flink e2e conf,support all datatype
Sep 1, 2022
0a5fd71
modify sheet to member variable
Sep 2, 2022
b64e547
Merge branch 'apache:dev' into dev
Bingz2 Sep 4, 2022
1265c23
fix flink e2e
Sep 4, 2022
a595be9
move excel dependency to file-connector
Sep 6, 2022
182f009
Handling code conflicts
Sep 7, 2022
4ad6160
Handling code conflicts
Sep 7, 2022
c218193
Handling code conflicts
Sep 7, 2022
c5470d4
Merge branch 'apache:dev' into dev
Bingz2 Sep 7, 2022
4973b36
Merge branch 'apache:dev' into dev
Bingz2 Sep 7, 2022
e6178bb
Merge branch 'apache:dev' into dev
Bingz2 Sep 9, 2022
0fe3dfe
Merge branch 'apache:dev' into dev
Bingz2 Sep 13, 2022
93cbbb0
Handling code conflicts
Bingz2 Sep 18, 2022
a9e57f1
Merge branch 'apache:dev' into dev
Bingz2 Sep 18, 2022
2513f4f
Handling code conflicts
Bingz2 Sep 24, 2022
144bb27
Merge branch 'apache:dev' into dev
Bingz2 Sep 24, 2022
a699681
use SXSSFWorkbook
Bingz2 Sep 24, 2022
2fb89b7
Resolving code conflicts
Bingz2 Oct 15, 2022
d3359f7
Resolving code conflicts
Bingz2 Oct 15, 2022
f025b89
Merge branch 'apache:dev' into dev
Bingz2 Oct 15, 2022
9a77d7e
Support for more data types
Bingz2 Oct 15, 2022
06edf3b
Support for more data types
Bingz2 Oct 16, 2022
6eee00e
Merge branch 'dev' into dev
EricJoy2048 Oct 25, 2022
162ba2f
Merge branch 'apache:dev' into dev
Bingz2 Oct 29, 2022
7605250
Merge branch 'apache:dev' into dev
Bingz2 Oct 30, 2022
971bc78
Resolving code conflicts
Nov 2, 2022
00134e4
Merge branch 'apache:dev' into dev
Bingz2 Nov 2, 2022
874af27
Resolving code conflicts
Nov 11, 2022
4f8d572
Merge branch 'apache:dev' into dev
Bingz2 Nov 11, 2022
5754104
Resolving code conflicts
Nov 18, 2022
78f8b2d
Merge branch 'apache:dev' into dev
Bingz2 Nov 18, 2022
9870a07
Merge branch 'apache:dev' into dev
Bingz2 Nov 19, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion docs/en/connector-v2/sink/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran

We supported as the following file types:

`text` `csv` `parquet` `orc` `json`
`text` `csv` `parquet` `orc` `json` `excel`

Please note that, The final file name will ends with the file_format's suffix, the suffix of the text file is `txt`.

Expand Down Expand Up @@ -161,3 +161,21 @@ HdfsFile {
}

```

For excel file format

```bash

HdfsFile {
path="hdfs://mycluster/tmp/hive/warehouse/test2"
partition_by=["age"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format="excel"
sink_columns=["name","age"]
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
}

```
20 changes: 19 additions & 1 deletion docs/en/connector-v2/sink/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran

We supported as the following file types:

`text` `csv` `parquet` `orc` `json`
`text` `csv` `parquet` `orc` `json` `excel`

Please note that, The final file name will ends with the file_format's suffix, the suffix of the text file is `txt`.

Expand Down Expand Up @@ -159,3 +159,21 @@ LocalFile {
}

```

For excel file format

```bash

LocalFile {
path="file:///tmp/hive/warehouse/test2"
partition_by=["age"]
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
file_format="excel"
sink_columns=["name","age"]
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
}

```
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,8 @@
<neo4j-java-driver.version>4.4.9</neo4j-java-driver.version>
<jedis.version>4.2.2</jedis.version>
<datahub.version>2.19.0-public</datahub.version>
<poi.version>4.1.2</poi.version>
<poi-ooxml.version>4.1.2</poi-ooxml.version>
</properties>

<dependencyManagement>
Expand Down Expand Up @@ -997,6 +999,19 @@
<artifactId>aliyun-sdk-datahub</artifactId>
<version>${datahub.version}</version>
</dependency>

<dependency>
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, please move connector dependency to connector's pom.xml. Reference: #2630

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, they have been moved to connector's pom.xml

<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
<version>${poi.version}</version>
</dependency>

<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
<version>${poi-ooxml.version}</version>
</dependency>

</dependencies>

</dependencyManagement>
Expand Down
11 changes: 11 additions & 0 deletions seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -93,5 +93,16 @@
<scope>provided</scope>
</dependency>


<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi</artifactId>
</dependency>

<dependency>
<groupId>org.apache.poi</groupId>
<artifactId>poi-ooxml</artifactId>
</dependency>

</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;

import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ExcelWriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.JsonWriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.ParquetWriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.TextWriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ExcelReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
Expand Down Expand Up @@ -87,6 +89,17 @@ public WriteStrategy getWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
public ReadStrategy getReadStrategy() {
return new JsonReadStrategy();
}
},
EXCEL("xlsx") {
@Override
public WriteStrategy getWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
return new ExcelWriteStrategy(textFileSinkConfig);
}

@Override
public ReadStrategy getReadStrategy() {
return new ExcelReadStrategy();
}
};

private final String suffix;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.sink.util;

import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;

import org.apache.poi.ss.usermodel.Cell;
import org.apache.poi.ss.usermodel.CellStyle;
import org.apache.poi.ss.usermodel.CreationHelper;
import org.apache.poi.ss.usermodel.Row;
import org.apache.poi.ss.usermodel.Sheet;
import org.apache.poi.ss.usermodel.Workbook;
import org.apache.poi.xssf.usermodel.XSSFWorkbook;

import java.io.IOException;
import java.io.OutputStream;
import java.util.List;

public class ExcelGenerator {
private Workbook wb;
private CellStyle wholeNumberCellStyle;
private CellStyle stringCellStyle;
private int row = 0;
private List<Integer> sinkColumnsIndexInRow;
private SeaTunnelRowType seaTunnelRowType;

public ExcelGenerator(List<Integer> sinkColumnsIndexInRow, SeaTunnelRowType seaTunnelRowType) {
this.sinkColumnsIndexInRow = sinkColumnsIndexInRow;
this.seaTunnelRowType = seaTunnelRowType;
wb = new XSSFWorkbook();
Copy link
Member

@hailin0 hailin0 Sep 7, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree with you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sheet st = wb.createSheet("Sheet1");
Row row = st.createRow(this.row);
for (Integer i : sinkColumnsIndexInRow) {
String fieldName = seaTunnelRowType.getFieldName(i);
row.createCell(i).setCellValue(fieldName);
}

wholeNumberCellStyle = createStyle(wb, "General");
stringCellStyle = createStyle(wb, "@");
this.row += 1;
}

public void writeData(SeaTunnelRow seaTunnelRow) {
Sheet st = wb.getSheet("Sheet1");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about treated st as a class attribute, when a SeaTunnelRow needs be consumed createSheet will be invoked. Of course, I have no detailed source code understanding, in each write whether to obtain a sheet object needs a double check.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh ,Let me modify that

Row excelRow = st.createRow(this.row);
SeaTunnelDataType<?>[] fieldTypes = seaTunnelRowType.getFieldTypes();
for (Integer i : sinkColumnsIndexInRow) {
Cell cell = excelRow.createCell(i);
Object value = seaTunnelRow.getField(i);
makeConverter(fieldTypes[i], value, cell);
}
this.row += 1;
}

public void flushAndCloseExcel(OutputStream output) throws IOException {
wb.write(output);
wb.close();
}

private void makeConverter(SeaTunnelDataType<?> type, Object value, Cell cell) {
if (value == null) {
cell.setBlank();
} else if (BasicType.STRING_TYPE.equals(type)) {
cell.setCellValue((String) value);
cell.setCellStyle(stringCellStyle);
} else if (BasicType.BOOLEAN_TYPE.equals(type)) {
cell.setCellValue((Boolean) value);
cell.setCellStyle(wholeNumberCellStyle);
} else if (BasicType.BYTE_TYPE.equals(type)) {
cell.setCellValue((byte) value);
cell.setCellStyle(wholeNumberCellStyle);
} else if (BasicType.SHORT_TYPE.equals(type)) {
cell.setCellValue((short) value);
cell.setCellStyle(wholeNumberCellStyle);
} else if (BasicType.INT_TYPE.equals(type)) {
cell.setCellValue((int) value);
cell.setCellStyle(wholeNumberCellStyle);
} else if (BasicType.LONG_TYPE.equals(type)) {
cell.setCellValue((long) value);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

value type is LocalDateTime?

cell.setCellStyle(wholeNumberCellStyle);
} else if (BasicType.FLOAT_TYPE.equals(type)) {
cell.setCellValue((float) value);
cell.setCellStyle(wholeNumberCellStyle);
} else if (BasicType.DOUBLE_TYPE.equals(type)) {
cell.setCellValue((double) value);
cell.setCellStyle(wholeNumberCellStyle);
} else {
String errorMsg = String.format("[%s] type not support ", type.getSqlType());
throw new RuntimeException(errorMsg);
}
}

private CellStyle createStyle(Workbook wb, String format) {
CreationHelper creationHelper = wb.getCreationHelper();
CellStyle cellStyle = wb.createCellStyle();
cellStyle.setDataFormat(creationHelper.createDataFormat().getFormat(format));
return cellStyle;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;

import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.ExcelGenerator;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;

import lombok.NonNull;
import org.apache.hadoop.fs.FSDataOutputStream;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class ExcelWriteStrategy extends AbstractWriteStrategy {
private Map<String, ExcelGenerator> beingWrittenWriter;

public ExcelWriteStrategy(TextFileSinkConfig textFileSinkConfig) {
super(textFileSinkConfig);
this.beingWrittenWriter = new HashMap<>();
}

@Override
public void write(SeaTunnelRow seaTunnelRow) throws Exception {
String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
ExcelGenerator excelGenerator = getOrCreateExcelGenerator(filePath);
excelGenerator.writeData(seaTunnelRow);
}

@Override
public void finishAndCloseFile() {
this.beingWrittenWriter.forEach((k, v) -> {
try {
FileSystemUtils.createFile(k);
FSDataOutputStream fileOutputStream = FileSystemUtils.getOutputStream(k);
v.flushAndCloseExcel(fileOutputStream);
fileOutputStream.close();
} catch (IOException e) {
log.error("can not get output file stream");
throw new RuntimeException(e);
}
needMoveFiles.put(k, getTargetLocation(k));
});
}

private ExcelGenerator getOrCreateExcelGenerator(@NonNull String filePath) {
ExcelGenerator excelGenerator = this.beingWrittenWriter.get(filePath);
if (excelGenerator == null) {
excelGenerator = new ExcelGenerator(sinkColumnsIndexInRow, seaTunnelRowType);
this.beingWrittenWriter.put(filePath, excelGenerator);
}
return excelGenerator;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.file.source.reader;

import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;

public class ExcelReadStrategy extends AbstractReadStrategy {
@Override
public void read(String path, Collector<SeaTunnelRow> output) throws Exception {

}

@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
return null;
}
}
Loading