Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug][Improve][Connector-v2][ElasticsearchSource] Fix behavior when source empty,Support SourceConfig.SOURCE field empty. #6425

Merged
merged 63 commits into from
Apr 29, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
63 commits
Select commit Hold shift + click to select a range
2281c85
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Feb 29, 2024
c17c900
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Mar 7, 2024
2009ea1
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
Mar 9, 2024
d168c3c
[Bug][Improve][Connector-v2][ElasticsearchSource] Fix behavior when s…
Mar 11, 2024
46e6fd3
Merge remote-tracking branch 'upstream/dev' into dev
Mar 11, 2024
a973109
Merge remote-tracking branch 'upstream/dev' into dev
CosmosNi Mar 26, 2024
eedf254
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Mar 26, 2024
facccd0
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Mar 26, 2024
9c68ce6
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Mar 26, 2024
81356a0
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Mar 26, 2024
b113a88
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Mar 26, 2024
f2d86c1
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Mar 26, 2024
fbb4e26
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Mar 26, 2024
c590b46
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Mar 26, 2024
36a75f5
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Mar 26, 2024
afa0cee
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Mar 26, 2024
87d5c2c
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Mar 26, 2024
c37e5ea
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Mar 26, 2024
8107b30
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Mar 26, 2024
51832b6
Merge remote-tracking branch 'upstream/dev' into dev
CosmosNi Apr 7, 2024
aedc55e
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Apr 7, 2024
89cd9a8
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Apr 7, 2024
d379dc3
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Apr 7, 2024
3a0b8e2
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Apr 7, 2024
1a5cc5c
Merge remote-tracking branch 'origin/dev' into dev
Apr 8, 2024
3c82a23
[Improve][Connector-v2][ElasticsearchSource]mark schema config in ES …
Apr 8, 2024
93a3adc
[Improve][Connector-v2][ElasticsearchSource]mark schema config in ES …
Apr 8, 2024
7bfdd2e
Merge remote-tracking branch 'upstream/dev' into dev
CosmosNi Apr 9, 2024
530c78b
Merge remote-tracking branch 'upstream/dev' into dev
CosmosNi Apr 9, 2024
25c212b
Update seatunnel-api/src/main/java/org/apache/seatunnel/api/table/cat…
Hisoka-X Apr 11, 2024
fff00fd
Merge remote-tracking branch 'upstream/dev' into dev
CosmosNi Apr 11, 2024
26c9ef5
[Bug][Connector-v2][ElasticsearchSource] mark schema config in ES sou…
CosmosNi Apr 11, 2024
dd8d5ef
Merge remote-tracking branch 'origin/dev' into dev
CosmosNi Apr 11, 2024
4d815df
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Apr 11, 2024
f59fbc6
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Apr 11, 2024
8b5b2b9
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Apr 11, 2024
f0fb936
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Apr 11, 2024
48c7372
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Apr 11, 2024
6029faf
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
CosmosNi Apr 11, 2024
a2163f6
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
Apr 11, 2024
2f7a535
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
Apr 11, 2024
01a08d8
seaTunnelDataTypes
Apr 11, 2024
997257d
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty.
Apr 11, 2024
1696706
[Bug][Connector-v2][ElasticsearchSource] Fix behavior when source empty
Apr 11, 2024
ad1249e
[Bug][Connector-v2][ElasticsearchSource] Fix behavior for array
CosmosNi Apr 12, 2024
a5cb18e
[Bug][Connector-v2][ElasticsearchSource] Fix behavior for array
CosmosNi Apr 12, 2024
8bc132d
[Bug][Connector-v2][ElasticsearchSource] Fix behavior for array
CosmosNi Apr 12, 2024
33070ff
[Bug][Connector-v2][ElasticsearchSource] Fix behavior for array
CosmosNi Apr 12, 2024
bb9e0d3
[Bug][Connector-v2][ElasticsearchSource] Fix behavior for array
CosmosNi Apr 12, 2024
dfebc53
[Bug][Connector-v2][ElasticsearchSource] Fix behavior for array
Apr 12, 2024
a548b37
[Bug][Connector-v2][ElasticsearchSource] Fix behavior for array
Apr 13, 2024
fd7ffda
[Bug][Connector-v2][ElasticsearchSource] Fix behavior for array
Apr 13, 2024
4790e1d
[Bug][Connector-v2][ElasticsearchSource] Fix behavior for array
Apr 13, 2024
cbb1745
[Bug][Connector-v2][ElasticsearchSource] Fix behavior for array
Apr 13, 2024
fb6cd22
[Bug][Connector-v2][ElasticsearchSource] map key sorted
Apr 13, 2024
a926be4
[Bug][Connector-v2][ElasticsearchSource] map key sorted
Apr 13, 2024
5fe883d
Merge remote-tracking branch 'upstream/dev' into dev
Apr 13, 2024
f250a50
[Bug][Connector-v2][ElasticsearchSource] Fix behavior for array
CosmosNi Apr 15, 2024
7af8a41
[Bug][Connector-v2][ElasticsearchSource] Fix behavior for array
CosmosNi Apr 15, 2024
0ddd58f
Merge remote-tracking branch 'upstream/dev' into dev
CosmosNi Apr 28, 2024
3bb4e54
[Bug][Connector-v2][ElasticsearchSource] fix doc.
CosmosNi Apr 28, 2024
7fa0e9e
[Bug][Connector-v2][ElasticsearchSource] Fix behavior for array
CosmosNi Apr 28, 2024
191b852
[Bug][Connector-v2][ElasticsearchSource] Fix behavior for array
CosmosNi Apr 28, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions docs/en/connector-v2/source/Elasticsearch.md
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@ support version >= 2.x and <= 8.x.
| query | json | no | {"match_all": {}} |
| scroll_time | string | no | 1m |
| scroll_size | int | no | 100 |
| schema | | no | - |
| tls_verify_certificate | boolean | no | true |
| tls_verify_hostnames | boolean | no | true |
| array_column | map | no | |
| tls_keystore_path | string | no | - |
| tls_keystore_password | string | no | - |
| tls_truststore_path | string | no | - |
Expand All @@ -58,7 +58,12 @@ Elasticsearch index name, support * fuzzy matching.

The fields of index.
You can get the document id by specifying the field `_id`.If sink _id to other index,you need specify an alias for _id due to the Elasticsearch limit.
If you don't config source, you must config `schema`.
If you don't config source, it is automatically retrieved from the mapping of the index.

### array_column [array]

The fields of array type.
Since there is no array index in es,so need assign array type,just like `{c_array = "array<tinyint>"}`.

### query [json]

Expand All @@ -73,11 +78,6 @@ Amount of time Elasticsearch will keep the search context alive for scroll reque

Maximum number of hits to be returned with each Elasticsearch scroll request.

### schema

The structure of the data, including field names and field types.
If you don't config schema, you must config `source`.

### tls_verify_certificate [boolean]

Enable certificates validation for HTTPS endpoints
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ public class SourceConfig {
.withDescription(
"The fields of index. You can get the document id by specifying the field _id.If sink _id to other index,you need specify an alias for _id due to the Elasticsearch limit");

public static final Option<Map<String, String>> ARRAY_COLUMN =
Options.key("array_column")
.mapType()
.defaultValue(new HashMap<>())
.withDescription(
"Because there is no array type in es,so need specify array Type.");

public static final Option<String> SCROLL_TIME =
Options.key("scroll_time")
.stringType()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.seatunnel.shade.com.fasterxml.jackson.core.type.TypeReference;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.NullNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.node.TextNode;

Expand Down Expand Up @@ -117,7 +118,9 @@ SeaTunnelRow convert(ElasticsearchRecord rowRecord) {
value = recursiveGet(rowRecord.getDoc(), fieldName);
if (value != null) {
seaTunnelDataType = rowTypeInfo.getFieldType(i);
if (value instanceof TextNode) {
if (value instanceof NullNode) {
seaTunnelFields[i] = null;
} else if (value instanceof TextNode) {
seaTunnelFields[i] =
convertValue(seaTunnelDataType, ((TextNode) value).textValue());
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.apache.seatunnel.connectors.seatunnel.elasticsearch.source;

import org.apache.seatunnel.shade.com.google.common.annotations.VisibleForTesting;

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
Expand All @@ -27,6 +29,7 @@
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.PhysicalColumn;
import org.apache.seatunnel.api.table.catalog.SeaTunnelDataTypeConvertorUtil;
import org.apache.seatunnel.api.table.catalog.TableIdentifier;
import org.apache.seatunnel.api.table.catalog.TableSchema;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
Expand All @@ -38,11 +41,17 @@
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.config.SourceConfig;

import org.apache.commons.collections4.CollectionUtils;

import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

@Slf4j
public class ElasticsearchSource
implements SeaTunnelSource<
SeaTunnelRow, ElasticsearchSourceSplit, ElasticsearchSourceState>,
Expand All @@ -55,27 +64,40 @@ public class ElasticsearchSource

private List<String> source;

private Map<String, String> arrayColumn;

public ElasticsearchSource(ReadonlyConfig config) {
this.config = config;
if (config.getOptional(TableSchemaOptions.SCHEMA).isPresent()) {
// todo: We need to remove the schema in ES.
log.warn(
"The schema config in ElasticSearch sink is deprecated, please use source config instead!");
catalogTable = CatalogTableUtil.buildWithConfig(config);
source = Arrays.asList(catalogTable.getSeaTunnelRowType().getFieldNames());
} else {
source = config.get(SourceConfig.SOURCE);
arrayColumn = config.get(SourceConfig.ARRAY_COLUMN);
EsRestClient esRestClient = EsRestClient.createInstance(config);
Map<String, BasicTypeDefine<EsType>> esFieldType =
esRestClient.getFieldTypeMapping(config.get(SourceConfig.INDEX), source);
esRestClient.close();
SeaTunnelDataType<?>[] fieldTypes = new SeaTunnelDataType[source.size()];
for (int i = 0; i < source.size(); i++) {
BasicTypeDefine<EsType> esType = esFieldType.get(source.get(i));
SeaTunnelDataType<?> seaTunnelDataType =
ElasticSearchTypeConverter.INSTANCE.convert(esType).getDataType();
fieldTypes[i] = seaTunnelDataType;

if (CollectionUtils.isEmpty(source)) {
source = new ArrayList<>(esFieldType.keySet());
}
SeaTunnelDataType[] fieldTypes = getSeaTunnelDataType(esFieldType, source);
TableSchema.Builder builder = TableSchema.builder();

for (int i = 0; i < source.size(); i++) {
String key = source.get(i);
if (arrayColumn.containsKey(key)) {
String value = arrayColumn.get(key);
SeaTunnelDataType<?> dataType =
SeaTunnelDataTypeConvertorUtil.deserializeSeaTunnelDataType(key, value);
builder.column(PhysicalColumn.of(key, dataType, 0, true, null, null));
continue;
}

builder.column(
PhysicalColumn.of(source.get(i), fieldTypes[i], 0, true, null, null));
}
Expand Down Expand Up @@ -127,4 +149,17 @@ public SourceReader<SeaTunnelRow, ElasticsearchSourceSplit> createReader(
return new ElasticsearchSourceSplitEnumerator(
enumeratorContext, sourceState, config, source);
}

@VisibleForTesting
public static SeaTunnelDataType[] getSeaTunnelDataType(
Map<String, BasicTypeDefine<EsType>> esFieldType, List<String> source) {
SeaTunnelDataType<?>[] fieldTypes = new SeaTunnelDataType[source.size()];
for (int i = 0; i < source.size(); i++) {
BasicTypeDefine<EsType> esType = esFieldType.get(source.get(i));
SeaTunnelDataType<?> seaTunnelDataType =
ElasticSearchTypeConverter.INSTANCE.convert(esType).getDataType();
fieldTypes[i] = seaTunnelDataType;
}
return fieldTypes;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.connectors.seatunnel.elasticsearch;

import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.source.ElasticsearchSource;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.Map;

public class ElasticsearchSourceTest {
@Test
public void testPrepareWithEmptySource() throws PrepareFailException {
BasicTypeDefine.BasicTypeDefineBuilder<EsType> typeDefine =
BasicTypeDefine.<EsType>builder()
.name("field1")
.columnType("text")
.dataType("text");
Map<String, BasicTypeDefine<EsType>> esFieldType = new HashMap<>();
esFieldType.put("field1", typeDefine.build());
SeaTunnelDataType[] seaTunnelDataTypes =
ElasticsearchSource.getSeaTunnelDataType(
esFieldType, new ArrayList<>(esFieldType.keySet()));
Assertions.assertNotNull(seaTunnelDataTypes);
Assertions.assertEquals(seaTunnelDataTypes[0].getTypeClass(), String.class);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,11 @@

import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.converter.BasicTypeDefine;
import org.apache.seatunnel.common.utils.JsonUtils;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.catalog.ElasticSearchCatalog;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsRestClient;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.client.EsType;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.BulkResponse;
import org.apache.seatunnel.connectors.seatunnel.elasticsearch.dto.source.ScrollResult;
import org.apache.seatunnel.e2e.common.TestResource;
Expand Down Expand Up @@ -57,6 +59,7 @@
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.format.DateTimeFormatter;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -111,6 +114,7 @@ public void startUp() throws Exception {
testDataset = generateTestDataSet();
createIndexDocs();
createIndexWithFullType();
createIndexForResourceNull();
}

/** create a index,and bulk some documents */
Expand Down Expand Up @@ -156,6 +160,16 @@ private void createIndexWithFullType() throws IOException, InterruptedException
2, esRestClient.getIndexDocsCount("st_index_full_type").get(0).getDocsCount());
}

private void createIndexForResourceNull() throws IOException {
String mapping =
IOUtils.toString(
ContainerUtil.getResourcesFile(
"/elasticsearch/st_index_source_without_schema_and_sink.json")
.toURI(),
StandardCharsets.UTF_8);
esRestClient.createIndex("st_index4", mapping);
}

@TestTemplate
public void testElasticsearch(TestContainer container)
throws IOException, InterruptedException {
Expand All @@ -179,6 +193,19 @@ public void testElasticsearchWithFullType(TestContainer container)
esRestClient.getIndexDocsCount("st_index_full_type_target").get(0).getDocsCount());
}

@TestTemplate
public void testElasticsearchWithoutSchema(TestContainer container)
throws IOException, InterruptedException {

Container.ExecResult execResult =
container.executeJob(
"/elasticsearch/elasticsearch_source_without_schema_and_sink.conf");
Assertions.assertEquals(0, execResult.getExitCode());
List<String> sinkData = readSinkDataWithOutSchema();
// for DSL is: {"range":{"c_int":{"gte":10,"lte":20}}}
Assertions.assertIterableEquals(mapTestDatasetForDSL(), sinkData);
}

private List<String> generateTestDataSet() throws JsonProcessingException {
String[] fields =
new String[] {
Expand All @@ -188,12 +215,12 @@ private List<String> generateTestDataSet() throws JsonProcessingException {
"c_boolean",
"c_tinyint",
"c_smallint",
"c_int",
"c_bigint",
"c_float",
"c_double",
"c_decimal",
"c_bytes",
"c_int",
"c_date",
"c_timestamp"
};
Expand Down Expand Up @@ -227,6 +254,14 @@ private List<String> generateTestDataSet() throws JsonProcessingException {
return documents;
}

private List<String> readSinkDataWithOutSchema() throws InterruptedException {
Map<String, BasicTypeDefine<EsType>> esFieldType =
esRestClient.getFieldTypeMapping("st_index4", Lists.newArrayList());
Thread.sleep(2000);
List<String> source = new ArrayList<>(esFieldType.keySet());
return getDocsWithTransformDate(source, "st_index4");
}

private List<String> readSinkData() throws InterruptedException {
// wait for index refresh
Thread.sleep(2000);
Expand All @@ -238,32 +273,33 @@ private List<String> readSinkData() throws InterruptedException {
"c_boolean",
"c_tinyint",
"c_smallint",
"c_int",
"c_bigint",
"c_float",
"c_double",
"c_decimal",
"c_bytes",
"c_int",
"c_date",
"c_timestamp");
return getDocsWithTransformTimestamp(source, "st_index2");
}

private List<String> getDocsWithTransformTimestamp(List<String> source, String index) {
HashMap<String, Object> rangeParam = new HashMap<>();
rangeParam.put("gte", 10);
rangeParam.put("lte", 20);
HashMap<String, Object> range = new HashMap<>();
range.put("c_int", rangeParam);
Map<String, Object> query = new HashMap<>();
query.put("range", range);
ScrollResult scrollResult =
esRestClient.searchByScroll("st_index2", source, query, "1m", 1000);
ScrollResult scrollResult = esRestClient.searchByScroll(index, source, query, "1m", 1000);
scrollResult
.getDocs()
.forEach(
x -> {
x.remove("_index");
x.remove("_type");
x.remove("_id");
// I don’t know if converting the test cases in this way complies with
// the CI specification
x.replace(
"c_timestamp",
LocalDateTime.parse(x.get("c_timestamp").toString())
Expand All @@ -280,6 +316,40 @@ private List<String> readSinkData() throws InterruptedException {
return docs;
}

private List<String> getDocsWithTransformDate(List<String> source, String index) {
HashMap<String, Object> rangeParam = new HashMap<>();
rangeParam.put("gte", 10);
rangeParam.put("lte", 20);
HashMap<String, Object> range = new HashMap<>();
range.put("c_int", rangeParam);
Map<String, Object> query = new HashMap<>();
query.put("range", range);
ScrollResult scrollResult = esRestClient.searchByScroll(index, source, query, "1m", 1000);
scrollResult
.getDocs()
.forEach(
x -> {
x.remove("_index");
x.remove("_type");
x.remove("_id");
x.replace(
"c_date",
LocalDate.parse(
x.get("c_date").toString(),
DateTimeFormatter.ofPattern(
"yyyy-MM-dd'T'HH:mm"))
.toString());
});
List<String> docs =
scrollResult.getDocs().stream()
.sorted(
Comparator.comparingInt(
o -> Integer.valueOf(o.get("c_int").toString())))
.map(JsonUtils::toJsonString)
.collect(Collectors.toList());
return docs;
}

private List<String> mapTestDatasetForDSL() {
return testDataset.stream()
.map(JsonUtils::parseObject)
Expand Down
Loading
Loading