Skip to content

Commit

Permalink
[Bug][Improve][Connector-v2][ElasticsearchSource] Fix behavior when s…
Browse files Browse the repository at this point in the history
…ource empty,Support SourceConfig.SOURCE field empty. (#6425)
  • Loading branch information
CosmosNi authored Apr 29, 2024
1 parent ce426fb commit 4e98eb8
Show file tree
Hide file tree
Showing 9 changed files with 313 additions and 22 deletions.
14 changes: 7 additions & 7 deletions docs/en/connector-v2/source/Elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ support version >= 2.x and <= 8.x.
| query | json | no | {"match_all": {}} |
| scroll_time | string | no | 1m |
| scroll_size | int | no | 100 |
| schema | | no | - |
| tls_verify_certificate | boolean | no | true |
| tls_verify_hostnames | boolean | no | true |
| array_column | map | no | |
| tls_keystore_path | string | no | - |
| tls_keystore_password | string | no | - |
| tls_truststore_path | string | no | - |
Expand All @@ -58,7 +58,12 @@ Elasticsearch index name, support * fuzzy matching.

The fields of index.
You can get the document id by specifying the field `_id`.If sink _id to other index,you need specify an alias for _id due to the Elasticsearch limit.
If you don't config source, you must config `schema`.
If you don't config source, it is automatically retrieved from the mapping of the index.

### array_column [array]

The fields of array type.
Since there is no array index in es,so need assign array type,just like `{c_array = "array<tinyint>"}`.

### query [json]

Expand All @@ -73,11 +78,6 @@ Amount of time Elasticsearch will keep the search context alive for scroll reque

Maximum number of hits to be returned with each Elasticsearch scroll request.

### schema

The structure of the data, including field names and field types.
If you don't config schema, you must config `source`.

### tls_verify_certificate [boolean]

Enable certificates validation for HTTPS endpoints
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ public class SourceConfig {
.withDescription(
"The fields of index. You can get the document id by specifying the field _id.If sink _id to other index,you need specify an alias for _id due to the Elasticsearch limit");

public static final Option<Map<String, String>> ARRAY_COLUMN =
Options.key("array_column")
.mapType()
.defaultValue(new HashMap<>())
.withDescription(
"Because there is no array type in es,so need specify array Type.");

public static final Option<String> SCROLL_TIME =
Options.key("scroll_time")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.NullNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;

Expand Down Expand Up @@ -117,7 +118,9 @@ SeaTunnelRow convert(ElasticsearchRecord rowRecord) {
value = recursiveGet(rowRecord.getDoc(), fieldName);
if (value != null) {
seaTunnelDataType = rowTypeInfo.getFieldType(i);
if (value instanceof TextNode) {
if (value instanceof NullNode) {
seaTunnelFields[i] = null;
} else if (value instanceof TextNode) {
seaTunnelFields[i] =
convertValue(seaTunnelDataType, ((TextNode) value).textValue());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
Expand All @@ -27,6 +29,7 @@
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
Expand All @@ -38,11 +41,17 @@
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;

import org.apache.commons.collections4.CollectionUtils;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

@Slf4j
public class ElasticsearchSource
implements SeaTunnelSource<
SeaTunnelRow, ElasticsearchSourceSplit, ElasticsearchSourceState>,
Expand All @@ -55,27 +64,40 @@ public class ElasticsearchSource

private List<String> source;

private Map<String, String> arrayColumn;

public ElasticsearchSource(ReadonlyConfig config) {
this.config = config;
if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
// todo: We need to remove the schema in ES.
log.warn(
"The schema config in ElasticSearch sink is deprecated, please use source config instead!");
catalogTable = CatalogTableUtil.buildWithConfig(config);
source = Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames());
} else {
source = config.get(SourceConfig.SOURCE);
arrayColumn = config.get(SourceConfig.ARRAY_COLUMN);
EsRestClient esRestClient = EsRestClient.createInstance(config);
Map<String, BasicTypeDefine<EsType>> esFieldType =
esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source);
esRestClient.close();
SeaTunnelDataType<?>[] fieldTypes = new SeaTunnelDataType[source.size()];
for (int i = 0; i < source.size(); i++) {
BasicTypeDefine<EsType> esType = esFieldType.get(source.get(i));
SeaTunnelDataType<?> seaTunnelDataType =
ElasticSearchTypeConverter.INSTANCE.convert(esType).getDataType();
fieldTypes[i] = seaTunnelDataType;

if (CollectionUtils.isEmpty(source)) {
source = new ArrayList<>(esFieldType.keySet());
}
SeaTunnelDataType[] fieldTypes = getSeaTunnelDataType(esFieldType, source);
TableSchema.Builder builder = TableSchema.builder();

for (int i = 0; i < source.size(); i++) {
String key = source.get(i);
if (arrayColumn.containsKey(key)) {
String value = arrayColumn.get(key);
SeaTunnelDataType<?> dataType =
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(key, value);
builder.column(PhysicalColumn.of(key, dataType, 0, true, null, null));
continue;
}

builder.column(
PhysicalColumn.of(source.get(i), fieldTypes[i], 0, true, null, null));
}
Expand Down Expand Up @@ -127,4 +149,17 @@ public SourceReader<SeaTunnelRow, ElasticsearchSourceSplit> createReader(
return new ElasticsearchSourceSplitEnumerator(
enumeratorContext, sourceState, config, source);
}

@VisibleForTesting
public static SeaTunnelDataType[] getSeaTunnelDataType(
Map<String, BasicTypeDefine<EsType>> esFieldType, List<String> source) {
SeaTunnelDataType<?>[] fieldTypes = new SeaTunnelDataType[source.size()];
for (int i = 0; i < source.size(); i++) {
BasicTypeDefine<EsType> esType = esFieldType.get(source.get(i));
SeaTunnelDataType<?> seaTunnelDataType =
ElasticSearchTypeConverter.INSTANCE.convert(esType).getDataType();
fieldTypes[i] = seaTunnelDataType;
}
return fieldTypes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.elasticsearch;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSource;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

public class ElasticsearchSourceTest {
@Test
public void testPrepareWithEmptySource() throws PrepareFailException {
BasicTypeDefine.BasicTypeDefineBuilder<EsType> typeDefine =
BasicTypeDefine.<EsType>builder()
.name("field1")
.columnType("text")
.dataType("text");
Map<String, BasicTypeDefine<EsType>> esFieldType = new HashMap<>();
esFieldType.put("field1", typeDefine.build());
SeaTunnelDataType[] seaTunnelDataTypes =
ElasticsearchSource.getSeaTunnelDataType(
esFieldType, new ArrayList<>(esFieldType.keySet()));
Assertions.assertNotNull(seaTunnelDataTypes);
Assertions.assertEquals(seaTunnelDataTypes[0].getTypeClass(), String.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.e2e.common.TestResource;
Expand Down Expand Up @@ -57,6 +59,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -111,6 +114,7 @@ public void startUp() throws Exception {
testDataset = generateTestDataSet();
createIndexDocs();
createIndexWithFullType();
createIndexForResourceNull();
}

/** create a index,and bulk some documents */
Expand Down Expand Up @@ -156,6 +160,16 @@ private void createIndexWithFullType() throws IOException, InterruptedException
2, esRestClient.getIndexDocsCount("st_index_full_type").get(0).getDocsCount());
}

private void createIndexForResourceNull() throws IOException {
String mapping =
IOUtils.toString(
ContainerUtil.getResourcesFile(
"/elasticsearch/st_index_source_without_schema_and_sink.json")
.toURI(),
StandardCharsets.UTF_8);
esRestClient.createIndex("st_index4", mapping);
}

@TestTemplate
public void testElasticsearch(TestContainer container)
throws IOException, InterruptedException {
Expand All @@ -179,6 +193,19 @@ public void testElasticsearchWithFullType(TestContainer container)
esRestClient.getIndexDocsCount("st_index_full_type_target").get(0).getDocsCount());
}

@TestTemplate
public void testElasticsearchWithoutSchema(TestContainer container)
throws IOException, InterruptedException {

Container.ExecResult execResult =
container.executeJob(
"/elasticsearch/elasticsearch_source_without_schema_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
List<String> sinkData = readSinkDataWithOutSchema();
// for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}}
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}

private List<String> generateTestDataSet() throws JsonProcessingException {
String[] fields =
new String[] {
Expand All @@ -188,12 +215,12 @@ private List<String> generateTestDataSet() throws JsonProcessingException {
"c_boolean",
"c_tinyint",
"c_smallint",
"c_int",
"c_bigint",
"c_float",
"c_double",
"c_decimal",
"c_bytes",
"c_int",
"c_date",
"c_timestamp"
};
Expand Down Expand Up @@ -227,6 +254,14 @@ private List<String> generateTestDataSet() throws JsonProcessingException {
return documents;
}

private List<String> readSinkDataWithOutSchema() throws InterruptedException {
Map<String, BasicTypeDefine<EsType>> esFieldType =
esRestClient.getFieldTypeMapping("st_index4", Lists.newArrayList());
Thread.sleep(2000);
List<String> source = new ArrayList<>(esFieldType.keySet());
return getDocsWithTransformDate(source, "st_index4");
}

private List<String> readSinkData() throws InterruptedException {
// wait for index refresh
Thread.sleep(2000);
Expand All @@ -238,32 +273,33 @@ private List<String> readSinkData() throws InterruptedException {
"c_boolean",
"c_tinyint",
"c_smallint",
"c_int",
"c_bigint",
"c_float",
"c_double",
"c_decimal",
"c_bytes",
"c_int",
"c_date",
"c_timestamp");
return getDocsWithTransformTimestamp(source, "st_index2");
}

private List<String> getDocsWithTransformTimestamp(List<String> source, String index) {
HashMap<String, Object> rangeParam = new HashMap<>();
rangeParam.put("gte", 10);
rangeParam.put("lte", 20);
HashMap<String, Object> range = new HashMap<>();
range.put("c_int", rangeParam);
Map<String, Object> query = new HashMap<>();
query.put("range", range);
ScrollResult scrollResult =
esRestClient.searchByScroll("st_index2", source, query, "1m", 1000);
ScrollResult scrollResult = esRestClient.searchByScroll(index, source, query, "1m", 1000);
scrollResult
.getDocs()
.forEach(
x -> {
x.remove("_index");
x.remove("_type");
x.remove("_id");
// I don’t know if converting the test cases in this way complies with
// the CI specification
x.replace(
"c_timestamp",
LocalDateTime.parse(x.get("c_timestamp").toString())
Expand All @@ -280,6 +316,40 @@ private List<String> readSinkData() throws InterruptedException {
return docs;
}

private List<String> getDocsWithTransformDate(List<String> source, String index) {
HashMap<String, Object> rangeParam = new HashMap<>();
rangeParam.put("gte", 10);
rangeParam.put("lte", 20);
HashMap<String, Object> range = new HashMap<>();
range.put("c_int", rangeParam);
Map<String, Object> query = new HashMap<>();
query.put("range", range);
ScrollResult scrollResult = esRestClient.searchByScroll(index, source, query, "1m", 1000);
scrollResult
.getDocs()
.forEach(
x -> {
x.remove("_index");
x.remove("_type");
x.remove("_id");
x.replace(
"c_date",
LocalDate.parse(
x.get("c_date").toString(),
DateTimeFormatter.ofPattern(
"yyyy-MM-dd'T'HH:mm"))
.toString());
});
List<String> docs =
scrollResult.getDocs().stream()
.sorted(
Comparator.comparingInt(
o -> Integer.valueOf(o.get("c_int").toString())))
.map(JsonUtils::toJsonString)
.collect(Collectors.toList());
return docs;
}

private List<String> mapTestDatasetForDSL() {
return testDataset.stream()
.map(JsonUtils::parseObject)
Expand Down
Loading

0 comments on commit 4e98eb8

Please sign in to comment.