Skip to content

Commit 3fb32c3

Browse files
committed
added a basic test that enriching data works
1 parent 002810a commit 3fb32c3

File tree

1 file changed

+104
-0
lines changed

1 file changed

+104
-0
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License;
4+
* you may not use this file except in compliance with the Elastic License.
5+
*/
6+
package org.elasticsearch.xpack.enrich;
7+
8+
import org.elasticsearch.action.admin.indices.refresh.RefreshRequest;
9+
import org.elasticsearch.action.bulk.BulkRequest;
10+
import org.elasticsearch.action.bulk.BulkResponse;
11+
import org.elasticsearch.action.get.GetRequest;
12+
import org.elasticsearch.action.get.GetResponse;
13+
import org.elasticsearch.action.index.IndexRequest;
14+
import org.elasticsearch.action.ingest.PutPipelineRequest;
15+
import org.elasticsearch.common.bytes.BytesArray;
16+
import org.elasticsearch.common.xcontent.XContentType;
17+
import org.elasticsearch.index.reindex.ReindexPlugin;
18+
import org.elasticsearch.plugins.Plugin;
19+
import org.elasticsearch.test.ESSingleNodeTestCase;
20+
import org.elasticsearch.xpack.core.enrich.EnrichPolicy;
21+
import org.elasticsearch.xpack.core.enrich.action.ExecuteEnrichPolicyAction;
22+
import org.elasticsearch.xpack.core.enrich.action.PutEnrichPolicyAction;
23+
24+
import java.util.Collection;
25+
import java.util.HashSet;
26+
import java.util.List;
27+
import java.util.Map;
28+
import java.util.Set;
29+
30+
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.DECORATE_FIELDS;
31+
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.KEY_FIELD;
32+
import static org.elasticsearch.xpack.enrich.EnrichMultiNodeIT.SOURCE_INDEX_NAME;
33+
import static org.hamcrest.Matchers.equalTo;
34+
import static org.hamcrest.Matchers.is;
35+
36+
public class BasicEnrichTests extends ESSingleNodeTestCase {
37+
38+
@Override
39+
protected Collection<Class<? extends Plugin>> getPlugins() {
40+
return List.of(LocalStateEnrich.class, ReindexPlugin.class);
41+
}
42+
43+
public void testIngestDataWithEnrichProcessor() {
44+
int numDocs = 32;
45+
List<String> keys = createSourceIndex(numDocs);
46+
47+
String policyName = "my-policy";
48+
EnrichPolicy enrichPolicy =
49+
new EnrichPolicy(EnrichPolicy.EXACT_MATCH_TYPE, null, List.of(SOURCE_INDEX_NAME), KEY_FIELD, List.of(DECORATE_FIELDS));
50+
PutEnrichPolicyAction.Request request = new PutEnrichPolicyAction.Request(policyName, enrichPolicy);
51+
client().execute(PutEnrichPolicyAction.INSTANCE, request).actionGet();
52+
client().execute(ExecuteEnrichPolicyAction.INSTANCE, new ExecuteEnrichPolicyAction.Request(policyName)).actionGet();
53+
54+
String pipelineName = "my-pipeline";
55+
String pipelineBody = "{\"processors\": [{\"enrich\": {\"policy_name\":\"" + policyName +
56+
"\", \"enrich_values\": [{\"source\": \"" + DECORATE_FIELDS[0] + "\", \"target\": \"" + DECORATE_FIELDS[0] + "\"}," +
57+
"{\"source\": \"" + DECORATE_FIELDS[1] + "\", \"target\": \"" + DECORATE_FIELDS[1] + "\"}," +
58+
"{\"source\": \"" + DECORATE_FIELDS[2] + "\", \"target\": \"" + DECORATE_FIELDS[2] + "\"}" +
59+
"]}}]}";
60+
PutPipelineRequest putPipelineRequest = new PutPipelineRequest(pipelineName, new BytesArray(pipelineBody), XContentType.JSON);
61+
client().admin().cluster().putPipeline(putPipelineRequest).actionGet();
62+
63+
BulkRequest bulkRequest = new BulkRequest("my-index");
64+
for (int i = 0; i < numDocs; i++) {
65+
IndexRequest indexRequest = new IndexRequest();
66+
indexRequest.id(Integer.toString(i));
67+
indexRequest.setPipeline(pipelineName);
68+
indexRequest.source(Map.of(KEY_FIELD, keys.get(i)));
69+
bulkRequest.add(indexRequest);
70+
}
71+
BulkResponse bulkResponse = client().bulk(bulkRequest).actionGet();
72+
assertThat("Expected no failure, but " + bulkResponse.buildFailureMessage(), bulkResponse.hasFailures(), is(false));
73+
74+
for (int i = 0; i < numDocs; i++) {
75+
GetResponse getResponse = client().get(new GetRequest("my-index", Integer.toString(i))).actionGet();
76+
Map<String, Object> source = getResponse.getSourceAsMap();
77+
assertThat(source.size(), equalTo(1 + DECORATE_FIELDS.length));
78+
for (int j = 0; j < 3; j++) {
79+
String field = DECORATE_FIELDS[j];
80+
assertThat(source.get(field), equalTo(keys.get(i) + j));
81+
}
82+
}
83+
}
84+
85+
private List<String> createSourceIndex(int numDocs) {
86+
Set<String> keys = new HashSet<>();
87+
for (int i = 0; i < numDocs; i++) {
88+
String key;
89+
do {
90+
key = randomAlphaOfLength(16);
91+
} while (keys.add(key) == false);
92+
93+
IndexRequest indexRequest = new IndexRequest(SOURCE_INDEX_NAME);
94+
indexRequest.create(true);
95+
indexRequest.id(key);
96+
indexRequest.source(Map.of(KEY_FIELD, key, DECORATE_FIELDS[0], key + "0",
97+
DECORATE_FIELDS[1], key + "1", DECORATE_FIELDS[2], key + "2"));
98+
client().index(indexRequest).actionGet();
99+
}
100+
client().admin().indices().refresh(new RefreshRequest(SOURCE_INDEX_NAME)).actionGet();
101+
return List.copyOf(keys);
102+
}
103+
104+
}

0 commit comments

Comments
 (0)