Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature][Connector V2][File] Add config of 'file_filter_pattern', which used for filtering files. #5153

Merged
merged 2 commits into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions docs/en/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Version Supported

## Sink Options

| Name | Type | Required | Default | Description |
| Name | Type | Required | Default | Description |
|---------------------|--------|----------|------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| fenodes | String | Yes | - | `Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."` |
| username | String | Yes | - | `Doris` user username |
Expand All @@ -49,7 +49,7 @@ Version Supported

## Data Type Mapping

| Doris Data type | SeaTunnel Data type |
| Doris Data type | SeaTunnel Data type |
|-----------------|-----------------------------------------|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/CosFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### path [string]

Expand Down Expand Up @@ -247,6 +248,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/FtpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### host [string]

Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### path [string]

Expand Down Expand Up @@ -245,6 +246,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
2 changes: 1 addition & 1 deletion docs/en/connector-v2/source/Hudi.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ In order to use this connector, You must ensure your spark/flink cluster already

## Source Options

| Name | Type | Required | Default | Description |
| Name | Type | Required | Default | Description |
|-------------------------|--------|------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| table.path | String | Yes | - | The hdfs root path of hudi table,such as 'hdfs://nameserivce/data/hudi/hudi_table/'. |
| table.type | String | Yes | - | The type of hudi table. Now we only support 'cow', 'mor' is not support yet. |
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### path [string]

Expand Down Expand Up @@ -225,6 +226,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/OssFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### path [string]

Expand Down Expand Up @@ -282,6 +283,10 @@ Reader the sheet of the workbook,Only used when file_format is excel.

```

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/OssJindoFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### path [string]

Expand Down Expand Up @@ -248,6 +249,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/S3File.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### path [string]

Expand Down Expand Up @@ -299,6 +300,10 @@ Reader the sheet of the workbook,Only used when file_format is excel.

```

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Changelog

### 2.3.0-beta 2022-10-20
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/SftpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### host [string]

Expand Down Expand Up @@ -226,6 +227,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,7 @@ public void prepare(Config pluginConfig) throws PrepareFailException {
throw new FileConnectorException(
FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
}
if (filePaths.isEmpty()) {
throw new FileConnectorException(
FileConnectorErrorCode.FILE_LIST_EMPTY,
"The target file list is empty,"
+ "SeaTunnel will not be able to sync empty table");
}

// support user-defined schema
FileFormat fileFormat =
FileFormat.valueOf(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,11 @@ public class BaseSourceConfig {
.stringType()
.noDefaultValue()
.withDescription("To be read sheet name,only valid for excel files");

public static final Option<String> FILE_FILTER_PATTERN =
Options.key("file_filter_pattern")
.stringType()
.noDefaultValue()
.withDescription(
"File pattern. The connector will filter some files base on the pattern.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;

import org.apache.hadoop.conf.Configuration;
Expand All @@ -43,6 +45,9 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
Expand Down Expand Up @@ -74,6 +79,8 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
protected long skipHeaderNumber = BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue();
protected transient boolean isKerberosAuthorization = false;

protected Pattern pattern;

@Override
public void init(HadoopConf conf) {
this.hadoopConf = conf;
Expand Down Expand Up @@ -126,7 +133,7 @@ public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throw
fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString()));
continue;
}
if (fileStatus.isFile()) {
if (fileStatus.isFile() && filterFileByPattern(fileStatus)) {
// filter '_SUCCESS' file
if (!fileStatus.getPath().getName().equals("_SUCCESS")
&& !fileStatus.getPath().getName().startsWith(".")) {
Expand All @@ -146,6 +153,15 @@ public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throw
}
}
}

if (fileNames.isEmpty()) {
throw new FileConnectorException(
FileConnectorErrorCode.FILE_LIST_EMPTY,
"The target file list is empty,"
+ "SeaTunnel will not be able to sync empty table, "
+ "please check the configuration parameters such as: [file_filter_pattern]");
}

return fileNames;
}

Expand All @@ -166,6 +182,11 @@ public void setPluginConfig(Config pluginConfig) {
if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) {
readColumns.addAll(pluginConfig.getStringList(BaseSourceConfig.READ_COLUMNS.key()));
}
if (pluginConfig.hasPath(BaseSourceConfig.FILE_FILTER_PATTERN.key())) {
String filterPattern =
pluginConfig.getString(BaseSourceConfig.FILE_FILTER_PATTERN.key());
this.pattern = Pattern.compile(Matcher.quoteReplacement(filterPattern));
}
}

@Override
Expand Down Expand Up @@ -214,4 +235,11 @@ protected SeaTunnelRowType mergePartitionTypes(String path, SeaTunnelRowType sea
// return merge row type
return new SeaTunnelRowType(newFieldNames, newFieldTypes);
}

protected boolean filterFileByPattern(FileStatus fileStatus) {
if (Objects.nonNull(pattern)) {
return pattern.matcher(fileStatus.getPath().getName()).matches();
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Loading