Skip to content

Commit

Permalink
[feat][elasticsearch] Add hashed id support (apache#72)
Browse files Browse the repository at this point in the history
* [feat][elasticsearch] Add hashed id support

* add config default test

(cherry picked from commit 4fdb34e)
  • Loading branch information
nicoloboschi committed Apr 20, 2022
1 parent 3b045ad commit 7a40cfa
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,12 @@ public class ElasticSearchConfig implements Serializable {
)
private boolean canonicalKeyFields = false;

@FieldDoc(
defaultValue = "NONE",
help = "Hashing algorithm to use for the document id. This is useful in order to be complaint with the ElasticSearch _id hard limit of 512 bytes."
)
private IdHashingAlgorithm idHashingAlgorithm = IdHashingAlgorithm.NONE;

public enum MalformedDocAction {
IGNORE,
WARN,
Expand All @@ -312,6 +318,12 @@ public enum CompatibilityMode {
OPENSEARCH
}

public enum IdHashingAlgorithm {
NONE,
SHA256,
SHA512
}

public static ElasticSearchConfig load(String yamlFile) throws IOException {
ObjectMapper mapper = new ObjectMapper(new YAMLFactory());
return mapper.readValue(new File(yamlFile), ElasticSearchConfig.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.hash.Hasher;
import com.google.common.hash.Hashing;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.api.Message;
Expand All @@ -46,6 +48,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Base64;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
Expand All @@ -65,6 +68,7 @@ public class ElasticSearchSink implements Sink<GenericObject> {
private final ObjectMapper objectMapper = new ObjectMapper();
private ObjectMapper sortedObjectMapper;
private List<String> primaryFields = null;
private final Base64.Encoder base64Encoder = Base64.getEncoder().withoutPadding();

@Override
public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception {
Expand Down Expand Up @@ -221,6 +225,27 @@ public Pair<String, String> extractIdAndDocument(Record<GenericObject> record) t
}
}

final ElasticSearchConfig.IdHashingAlgorithm idHashingAlgorithm =
elasticSearchConfig.getIdHashingAlgorithm();
if (id != null
&& idHashingAlgorithm != null
&& idHashingAlgorithm != ElasticSearchConfig.IdHashingAlgorithm.NONE) {
Hasher hasher;
switch (idHashingAlgorithm) {
case SHA256:
hasher = Hashing.sha256().newHasher();
break;
case SHA512:
hasher = Hashing.sha512().newHasher();
break;
default:
throw new UnsupportedOperationException("Unsupported IdHashingAlgorithm: " +
idHashingAlgorithm);
}
hasher.putString(id, StandardCharsets.UTF_8);
id = base64Encoder.encodeToString(hasher.hash().asBytes());
}

if (log.isDebugEnabled()) {
SchemaType schemaType = null;
if (record.getSchema() != null && record.getSchema().getSchemaInfo() != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ public RestClient(ElasticSearchConfig elasticSearchConfig, BulkProcessor.Listene
public abstract boolean deleteDocument(String index, String documentId) throws IOException;

public abstract long totalHits(String index) throws IOException;
public abstract long totalHits(String index, String query) throws IOException;

public abstract BulkProcessor getBulkProcessor();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,19 +169,29 @@ public boolean indexDocument(String index, String documentId, String documentSou
}
}

@VisibleForTesting
public SearchResponse<Map> search(String indexName) throws IOException {
return search(indexName, "*:*");
}

@VisibleForTesting
public SearchResponse<Map> search(String indexName, String query) throws IOException {
final RefreshRequest refreshRequest = new RefreshRequest.Builder().index(indexName).build();
client.indices().refresh(refreshRequest);

query = query.replace("/", "\\/");
return client.search(new SearchRequest.Builder().index(indexName)
.q("*:*")
.q(query)
.build(), Map.class);
}

@Override
public long totalHits(String indexName) throws IOException {
final SearchResponse<Map> searchResponse = search(indexName);
return totalHits(indexName, "*:*");
}

@Override
public long totalHits(String indexName, String query) throws IOException {
final SearchResponse<Map> searchResponse = search(indexName, query);
return searchResponse.hits().total().value();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.common.unit.ByteSizeValue;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.xcontent.XContentType;
import org.opensearch.index.query.QueryBuilder;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.search.builder.SearchSourceBuilder;

Expand Down Expand Up @@ -265,13 +266,32 @@ public long totalHits(String indexName) throws IOException {
return search(indexName).getHits().getTotalHits().value;
}

@Override
public long totalHits(String indexName, String query) throws IOException {
return search(indexName, query).getHits().getTotalHits().value;
}

@VisibleForTesting
public SearchResponse search(String indexName) throws IOException {
return search(indexName, "*:*");
}

@VisibleForTesting
public SearchResponse search(String indexName, String query) throws IOException {
client.indices().refresh(new RefreshRequest(indexName), RequestOptions.DEFAULT);
QueryBuilder queryBuilder;
if ("*:*".equals(query)) {
queryBuilder = QueryBuilders.matchAllQuery();
} else {
final String[] split = query.split(":");
final String name = split[0];
final String text = split[1];
queryBuilder = QueryBuilders.matchQuery(name, text);
}
return client.search(
new SearchRequest()
.indices(indexName)
.source(new SearchSourceBuilder().query(QueryBuilders.matchAllQuery())),
.source(new SearchSourceBuilder().query(queryBuilder)) ,
RequestOptions.DEFAULT);
}
@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,8 @@ public final void defaultValueTest() throws IOException {
assertEquals(config.getSsl().getProtocols(), "TLSv1.2");

assertEquals(config.getCompatibilityMode(), ElasticSearchConfig.CompatibilityMode.AUTO);
assertEquals(config.isCanonicalKeyFields(), false);
assertEquals(config.getIdHashingAlgorithm(), ElasticSearchConfig.IdHashingAlgorithm.NONE);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.TypedMessageBuilder;
import org.apache.pulsar.client.api.schema.GenericObject;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.api.schema.GenericSchema;
import org.apache.pulsar.client.api.schema.RecordSchemaBuilder;
import org.apache.pulsar.client.api.schema.SchemaBuilder;
import org.apache.pulsar.client.impl.MessageImpl;
import org.apache.pulsar.client.impl.schema.generic.GenericJsonSchema;
import org.apache.pulsar.common.schema.KeyValue;
Expand All @@ -48,14 +49,19 @@
import org.testng.annotations.AfterClass;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.HashMap;
import java.util.Locale;
import java.util.Map;
import java.util.Optional;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.mockito.Mockito.*;
import static org.testng.Assert.assertEquals;
Expand Down Expand Up @@ -227,14 +233,19 @@ public final void sendKeyIgnoreSingleField() throws Exception {
verify(mockRecord, times(1)).ack();
assertEquals(sink.getElasticsearchClient().getRestClient().totalHits(index), 1L);

String value = getHitIdAtIndex(index, 0);
assertEquals(value, "bob");
}

private String getHitIdAtIndex(String indexName, int index) throws IOException {
if (elasticImageName.equals(ELASTICSEARCH_8)) {
final ElasticSearchJavaRestClient restClient = (ElasticSearchJavaRestClient)
sink.getElasticsearchClient().getRestClient();
assertEquals(restClient.search(index).hits().hits().get(0).id(), "bob");
return restClient.search(indexName).hits().hits().get(index).id();
} else {
final OpenSearchHighLevelRestClient restClient = (OpenSearchHighLevelRestClient)
sink.getElasticsearchClient().getRestClient();
assertEquals(restClient.search(index).getHits().getHits()[0].getId(), "bob");
return restClient.search(indexName).getHits().getHits()[0].getId();
}
}

Expand All @@ -248,15 +259,7 @@ public final void sendKeyIgnoreMultipleFields() throws Exception {
send(1);
verify(mockRecord, times(1)).ack();
assertEquals(sink.getElasticsearchClient().getRestClient().totalHits(index), 1L);
if (elasticImageName.equals(ELASTICSEARCH_8)) {
final ElasticSearchJavaRestClient restClient = (ElasticSearchJavaRestClient)
sink.getElasticsearchClient().getRestClient();
assertEquals(restClient.search(index).hits().hits().get(0).id(), "[\"bob\",\"boby\"]");
} else {
final OpenSearchHighLevelRestClient restClient = (OpenSearchHighLevelRestClient)
sink.getElasticsearchClient().getRestClient();
assertEquals(restClient.search(index).getHits().getHits()[0].getId(), "[\"bob\",\"boby\"]");
}
assertEquals("[\"bob\",\"boby\"]", getHitIdAtIndex(index, 0));
}

protected final void send(int numRecords) throws Exception {
Expand Down Expand Up @@ -422,4 +425,131 @@ public void testCloseClient() throws Exception {
sink.close();
}
}

@DataProvider(name = "IdHashingAlgorithm")
public Object[] schemaType() {
return new Object[]{
ElasticSearchConfig.IdHashingAlgorithm.SHA256,
ElasticSearchConfig.IdHashingAlgorithm.SHA512
};
}

@Test(dataProvider = "IdHashingAlgorithm")
public final void testHashKey(ElasticSearchConfig.IdHashingAlgorithm algorithm) throws Exception {
when(mockRecord.getKey()).thenAnswer((Answer<Optional<String>>) invocation -> Optional.of( "record-key"));
final String indexName = "test-index" + UUID.randomUUID();
map.put("indexName", indexName);
map.put("keyIgnore", "false");
map.put("idHashingAlgorithm", algorithm.toString());
sink.open(map, mockSinkContext);
send(10);
verify(mockRecord, times(10)).ack();
final String expectedHashedValue = algorithm == ElasticSearchConfig.IdHashingAlgorithm.SHA256 ?
"gbY32PzSxtpjWeaWMROhFw3nleS3JbhNHgtM/Z7FjOk" :
"BBaia6VUM0KGsZVJGOyte6bDNXW0nfkV/zNntc737Nk7HwtDZjZmeyezYwEVQ5cfHIHDFR1e9yczUBwf8zw0rw";
final long count = sink.getElasticsearchClient().getRestClient()
.totalHits(indexName, "_id:" + expectedHashedValue);
assertEquals(count, 1);
}

@Test
public final void testKeyValueHashAndCanonicalOutput() throws Exception {
RecordSchemaBuilder keySchemaBuilder = SchemaBuilder.record("key");
keySchemaBuilder.field("keyFieldB").type(SchemaType.STRING).optional().defaultValue(null);
keySchemaBuilder.field("keyFieldA").type(SchemaType.STRING).optional().defaultValue(null);
GenericSchema<GenericRecord> keySchema = Schema.generic(keySchemaBuilder.build(SchemaType.JSON));

// more than 512 bytes to break the _id size limitation
final String keyFieldBValue = Stream.generate(() -> "keyB").limit(1000).collect(Collectors.joining());
GenericRecord keyGenericRecord = keySchema.newRecordBuilder()
.set("keyFieldB", keyFieldBValue)
.set("keyFieldA", "keyA")
.build();

GenericRecord keyGenericRecord2 = keySchema.newRecordBuilder()
.set("keyFieldA", "keyA")
.set("keyFieldB", keyFieldBValue)
.build();
Record<GenericObject> genericObjectRecord = createKeyValueGenericRecordWithGenericKeySchema(
keySchema, keyGenericRecord);
Record<GenericObject> genericObjectRecord2 = createKeyValueGenericRecordWithGenericKeySchema(
keySchema, keyGenericRecord2);
final String indexName = "test-index" + UUID.randomUUID();
map.put("indexName", indexName);
map.put("keyIgnore", "false");
map.put("nullValueAction", ElasticSearchConfig.NullValueAction.DELETE.toString());
map.put("canonicalKeyFields", "true");
map.put("idHashingAlgorithm", ElasticSearchConfig.IdHashingAlgorithm.SHA512);
sink.open(map, mockSinkContext);
for (int idx = 0; idx < 10; idx++) {
sink.write(genericObjectRecord);
}
for (int idx = 0; idx < 10; idx++) {
sink.write(genericObjectRecord2);
}
final String expectedHashedValue = "7BmM3pkYIbhm8cPN5ePd/BeZ7lYZnKhzmiJ62k0PsGNNAQdk" +
"S+/te9+NKpdy31lEN0jT1MVrBjYIj4O08QsU1g";
long count = sink.getElasticsearchClient().getRestClient()
.totalHits(indexName, "_id:" + expectedHashedValue);
assertEquals(count, 1);


Record<GenericObject> genericObjectRecordDelete = createKeyValueGenericRecordWithGenericKeySchema(
keySchema, keyGenericRecord, true);
sink.write(genericObjectRecordDelete);
count = sink.getElasticsearchClient().getRestClient()
.totalHits(indexName, "_id:" + expectedHashedValue);
assertEquals(count, 0);

}

private Record<GenericObject> createKeyValueGenericRecordWithGenericKeySchema(
GenericSchema<GenericRecord> keySchema,
GenericRecord keyGenericRecord) {
return createKeyValueGenericRecordWithGenericKeySchema(
keySchema,
keyGenericRecord,
false
);
}

private Record<GenericObject> createKeyValueGenericRecordWithGenericKeySchema(
GenericSchema<GenericRecord> keySchema,
GenericRecord keyGenericRecord, boolean emptyValue) {

Schema<KeyValue<GenericRecord, GenericRecord>> keyValueSchema =
Schema.KeyValue(keySchema, genericSchema, KeyValueEncodingType.INLINE);
KeyValue<GenericRecord, GenericRecord> keyValue = new KeyValue<>(keyGenericRecord,
emptyValue ? null: userProfile);
GenericObject genericObject = new GenericObject() {
@Override
public SchemaType getSchemaType() {
return SchemaType.KEY_VALUE;
}

@Override
public Object getNativeObject() {
return keyValue;
}
};
Record<GenericObject> genericObjectRecord = new Record<GenericObject>() {
@Override
public Optional<String> getTopicName() {
return Optional.of("topic-name");
}

@Override
public Schema getSchema() {
return keyValueSchema;
}

@Override
public GenericObject getValue() {
return genericObject;
}
};
return genericObjectRecord;
}


}

0 comments on commit 7a40cfa

Please sign in to comment.