Skip to content

Commit

Permalink
[Feature][Connector V2][File] Add config of 'file_filter_pattern', wh…
Browse files Browse the repository at this point in the history
…ich used for filtering files.
  • Loading branch information
FlechazoW committed Jul 31, 2023
1 parent 2a85525 commit 2054800
Show file tree
Hide file tree
Showing 28 changed files with 646 additions and 169 deletions.
4 changes: 2 additions & 2 deletions docs/en/connector-v2/sink/Doris.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Version Supported

## Sink Options

| Name | Type | Required | Default | Description |
| Name | Type | Required | Default | Description |
|---------------------|--------|----------|------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| fenodes | String | Yes | - | `Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."` |
| username | String | Yes | - | `Doris` user username |
Expand All @@ -49,7 +49,7 @@ Version Supported

## Data Type Mapping

| Doris Data type | SeaTunnel Data type |
| Doris Data type | SeaTunnel Data type |
|-----------------|-----------------------------------------|
| BOOLEAN | BOOLEAN |
| TINYINT | TINYINT |
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/CosFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### path [string]

Expand Down Expand Up @@ -247,6 +248,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
1 change: 1 addition & 0 deletions docs/en/connector-v2/source/FtpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### host [string]

Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/HdfsFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### path [string]

Expand Down Expand Up @@ -245,6 +246,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
2 changes: 1 addition & 1 deletion docs/en/connector-v2/source/Hudi.md
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ In order to use this connector, You must ensure your spark/flink cluster already

## Source Options

| Name | Type | Required | Default | Description |
| Name | Type | Required | Default | Description |
|-------------------------|--------|------------------------------|---------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| table.path | String | Yes | - | The hdfs root path of hudi table,such as 'hdfs://nameserivce/data/hudi/hudi_table/'. |
| table.type | String | Yes | - | The type of hudi table. Now we only support 'cow', 'mor' is not support yet. |
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/LocalFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### path [string]

Expand Down Expand Up @@ -225,6 +226,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/OssFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### path [string]

Expand Down Expand Up @@ -282,6 +283,10 @@ Reader the sheet of the workbook,Only used when file_format is excel.
```

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Changelog

### 2.2.0-beta 2022-09-26
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/OssJindoFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### path [string]

Expand Down Expand Up @@ -248,6 +249,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/S3File.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### path [string]

Expand Down Expand Up @@ -299,6 +300,10 @@ Reader the sheet of the workbook,Only used when file_format is excel.
```

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Changelog

### 2.3.0-beta 2022-10-20
Expand Down
5 changes: 5 additions & 0 deletions docs/en/connector-v2/source/SftpFile.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| schema | config | no | - |
| common-options | | no | - |
| sheet_name | string | no | - |
| file_filter_pattern | string | no | - |

### host [string]

Expand Down Expand Up @@ -226,6 +227,10 @@ Source plugin common parameters, please refer to [Source Common Options](common-

Reader the sheet of the workbook,Only used when file_format is excel.

### file_filter_pattern [string]

Filter pattern, which used for filtering files.

## Example

```hocon
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,11 @@ public class BaseSourceConfig {
.stringType()
.noDefaultValue()
.withDescription("To be read sheet name,only valid for excel files");

public static final Option<String> FILE_FILTER_PATTERN =
Options.key("file_filter_pattern")
.stringType()
.noDefaultValue()
.withDescription(
"File pattern. The connector will filter some files base on the pattern.");
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

import static org.apache.parquet.avro.AvroReadSupport.READ_INT96_AS_FIXED;
import static org.apache.parquet.avro.AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS;
Expand Down Expand Up @@ -74,6 +77,8 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
protected long skipHeaderNumber = BaseSourceConfig.SKIP_HEADER_ROW_NUMBER.defaultValue();
protected transient boolean isKerberosAuthorization = false;

protected Pattern pattern;

@Override
public void init(HadoopConf conf) {
this.hadoopConf = conf;
Expand Down Expand Up @@ -126,7 +131,7 @@ public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throw
fileNames.addAll(getFileNamesByPath(hadoopConf, fileStatus.getPath().toString()));
continue;
}
if (fileStatus.isFile()) {
if (fileStatus.isFile() && filterFileByPattern(fileStatus)) {
// filter '_SUCCESS' file
if (!fileStatus.getPath().getName().equals("_SUCCESS")
&& !fileStatus.getPath().getName().startsWith(".")) {
Expand All @@ -146,6 +151,12 @@ public List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throw
}
}
}

if (fileNames.isEmpty()) {
throw new IllegalArgumentException(
"Got no file, please check the configuration parameters such as: [file_filter_pattern]");
}

return fileNames;
}

Expand All @@ -166,6 +177,11 @@ public void setPluginConfig(Config pluginConfig) {
if (pluginConfig.hasPath(BaseSourceConfig.READ_COLUMNS.key())) {
readColumns.addAll(pluginConfig.getStringList(BaseSourceConfig.READ_COLUMNS.key()));
}
if (pluginConfig.hasPath(BaseSourceConfig.FILE_FILTER_PATTERN.key())) {
String filterPattern =
pluginConfig.getString(BaseSourceConfig.FILE_FILTER_PATTERN.key());
this.pattern = Pattern.compile(Matcher.quoteReplacement(filterPattern));
}
}

@Override
Expand Down Expand Up @@ -214,4 +230,11 @@ protected SeaTunnelRowType mergePartitionTypes(String path, SeaTunnelRowType sea
// return merge row type
return new SeaTunnelRowType(newFieldNames, newFieldTypes);
}

protected boolean filterFileByPattern(FileStatus fileStatus) {
if (Objects.nonNull(pattern)) {
return pattern.matcher(fileStatus.getPath().getName()).matches();
}
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ public OptionRule optionRule() {
.optional(BaseSourceConfig.DATE_FORMAT)
.optional(BaseSourceConfig.DATETIME_FORMAT)
.optional(BaseSourceConfig.TIME_FORMAT)
.optional(BaseSourceConfig.FILE_FILTER_PATTERN)
.build();
}

Expand Down
Loading

0 comments on commit 2054800

Please sign in to comment.