Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(elasticsearch): refactor idHashAlgo setting #11193

Merged
merged 1 commit into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions docker/datahub-gms/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,6 @@ PE_CONSUMER_ENABLED=true
UI_INGESTION_ENABLED=true
ENTITY_SERVICE_ENABLE_RETENTION=true

ELASTIC_ID_HASH_ALGO=MD5

# Uncomment to disable persistence of client-side analytics events
# DATAHUB_ANALYTICS_ENABLED=false

Expand Down
2 changes: 0 additions & 2 deletions docker/datahub-gms/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,6 @@ MCE_CONSUMER_ENABLED=true
PE_CONSUMER_ENABLED=true
UI_INGESTION_ENABLED=true

ELASTIC_ID_HASH_ALGO=MD5

# Uncomment to enable Metadata Service Authentication
METADATA_SERVICE_AUTH_ENABLED=false

Expand Down
2 changes: 0 additions & 2 deletions docker/datahub-mae-consumer/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,6 @@ ES_BULK_REFRESH_POLICY=WAIT_UNTIL
GRAPH_SERVICE_IMPL=elasticsearch
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml

ELASTIC_ID_HASH_ALGO=MD5

# Uncomment to disable persistence of client-side analytics events
# DATAHUB_ANALYTICS_ENABLED=false

Expand Down
2 changes: 0 additions & 2 deletions docker/datahub-mae-consumer/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ NEO4J_PASSWORD=datahub
GRAPH_SERVICE_IMPL=neo4j
ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml

ELASTIC_ID_HASH_ALGO=MD5

# Uncomment to disable persistence of client-side analytics events
# DATAHUB_ANALYTICS_ENABLED=false

Expand Down
2 changes: 0 additions & 2 deletions docker/datahub-mce-consumer/env/docker-without-neo4j.env
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ MAE_CONSUMER_ENABLED=false
PE_CONSUMER_ENABLED=false
UI_INGESTION_ENABLED=false

ELASTIC_ID_HASH_ALGO=MD5

# Uncomment to configure kafka topic names
# Make sure these names are consistent across the whole deployment
# METADATA_CHANGE_PROPOSAL_TOPIC_NAME=MetadataChangeProposal_v1
Expand Down
2 changes: 0 additions & 2 deletions docker/datahub-mce-consumer/env/docker.env
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ MAE_CONSUMER_ENABLED=false
PE_CONSUMER_ENABLED=false
UI_INGESTION_ENABLED=false

ELASTIC_ID_HASH_ALGO=MD5

# Uncomment to configure kafka topic names
# Make sure these names are consistent across the whole deployment
# METADATA_CHANGE_PROPOSAL_TOPIC_NAME=MetadataChangeProposal_v1
Expand Down
1 change: 0 additions & 1 deletion docker/quickstart/docker-compose-m1.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ services:
- ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX=true
- ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX=true
- ELASTICSEARCH_PORT=9200
- ELASTIC_ID_HASH_ALGO=MD5
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ services:
- ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX=true
- ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX=true
- ELASTICSEARCH_PORT=9200
- ELASTIC_ID_HASH_ALGO=MD5
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ services:
- ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX=true
- ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX=true
- ELASTICSEARCH_PORT=9200
- ELASTIC_ID_HASH_ALGO=MD5
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ services:
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
- GRAPH_SERVICE_IMPL=elasticsearch
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml
- ELASTIC_ID_HASH_ALGO=MD5
hostname: datahub-mae-consumer
image: ${DATAHUB_MAE_CONSUMER_IMAGE:-acryldata/datahub-mae-consumer}:${DATAHUB_VERSION:-head}
ports:
Expand All @@ -38,7 +37,6 @@ services:
- EBEAN_DATASOURCE_USERNAME=datahub
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ELASTIC_ID_HASH_ALGO=MD5
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mce-consumer/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
2 changes: 0 additions & 2 deletions docker/quickstart/docker-compose.consumers.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ services:
- NEO4J_PASSWORD=datahub
- GRAPH_SERVICE_IMPL=neo4j
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mae-consumer/resources/entity-registry.yml
- ELASTIC_ID_HASH_ALGO=MD5
hostname: datahub-mae-consumer
image: ${DATAHUB_MAE_CONSUMER_IMAGE:-acryldata/datahub-mae-consumer}:${DATAHUB_VERSION:-head}
ports:
Expand All @@ -48,7 +47,6 @@ services:
- EBEAN_DATASOURCE_USERNAME=datahub
- ELASTICSEARCH_HOST=elasticsearch
- ELASTICSEARCH_PORT=9200
- ELASTIC_ID_HASH_ALGO=MD5
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-mce-consumer/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
1 change: 0 additions & 1 deletion docker/quickstart/docker-compose.quickstart.yml
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,6 @@ services:
- ELASTICSEARCH_INDEX_BUILDER_MAPPINGS_REINDEX=true
- ELASTICSEARCH_INDEX_BUILDER_SETTINGS_REINDEX=true
- ELASTICSEARCH_PORT=9200
- ELASTIC_ID_HASH_ALGO=MD5
- ENTITY_REGISTRY_CONFIG_PATH=/datahub/datahub-gms/resources/entity-registry.yml
- ENTITY_SERVICE_ENABLE_RETENTION=true
- ES_BULK_REFRESH_POLICY=WAIT_UNTIL
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import java.util.Map;
import java.util.Optional;
import java.util.stream.Collectors;
import javax.annotation.Nonnull;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.EqualsAndHashCode;
Expand Down Expand Up @@ -59,7 +60,7 @@ public Edge(
null);
}

public String toDocId() {
public String toDocId(@Nonnull String idHashAlgo) {
StringBuilder rawDocId = new StringBuilder();
rawDocId
.append(getSource().toString())
Expand All @@ -72,9 +73,8 @@ public String toDocId() {
}

try {
String hashAlgo = System.getenv("ELASTIC_ID_HASH_ALGO");
byte[] bytesOfRawDocID = rawDocId.toString().getBytes(StandardCharsets.UTF_8);
MessageDigest md = MessageDigest.getInstance(hashAlgo);
MessageDigest md = MessageDigest.getInstance(idHashAlgo);
byte[] thedigest = md.digest(bytesOfRawDocID);
return Base64.getEncoder().encodeToString(thedigest);
} catch (NoSuchAlgorithmException e) {
Expand Down
1 change: 0 additions & 1 deletion metadata-io/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ test {
// override, testng controlling parallelization
// increasing >1 will merely run all tests extra times
maxParallelForks = 1
environment "ELASTIC_ID_HASH_ALGO", "MD5"
}
useTestNG() {
suites 'src/test/resources/testng.xml'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class ElasticSearchGraphService implements GraphService, ElasticSearchInd
private final ESGraphWriteDAO _graphWriteDAO;
private final ESGraphQueryDAO _graphReadDAO;
private final ESIndexBuilder _indexBuilder;
private final String idHashAlgo;
public static final String INDEX_NAME = "graph_service_v1";
private static final Map<String, Object> EMPTY_HASH = new HashMap<>();

Expand Down Expand Up @@ -125,7 +126,7 @@ public LineageRegistry getLineageRegistry() {

@Override
public void addEdge(@Nonnull final Edge edge) {
String docId = edge.toDocId();
String docId = edge.toDocId(idHashAlgo);
String edgeDocument = toDocument(edge);
_graphWriteDAO.upsertDocument(docId, edgeDocument);
}
Expand All @@ -137,7 +138,7 @@ public void upsertEdge(@Nonnull final Edge edge) {

@Override
public void removeEdge(@Nonnull final Edge edge) {
String docId = edge.toDocId();
String docId = edge.toDocId(idHashAlgo);
_graphWriteDAO.deleteDocument(docId);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public class UpdateIndicesService implements SearchIndicesService {
private final SystemMetadataService _systemMetadataService;
private final SearchDocumentTransformer _searchDocumentTransformer;
private final EntityIndexBuilders _entityIndexBuilders;
@Nonnull private final String idHashAlgo;

@Value("${featureFlags.graphServiceDiffModeEnabled:true}")
private boolean _graphDiffMode;
Expand Down Expand Up @@ -117,13 +118,15 @@ public UpdateIndicesService(
TimeseriesAspectService timeseriesAspectService,
SystemMetadataService systemMetadataService,
SearchDocumentTransformer searchDocumentTransformer,
EntityIndexBuilders entityIndexBuilders) {
EntityIndexBuilders entityIndexBuilders,
@Nonnull String idHashAlgo) {
_graphService = graphService;
_entitySearchService = entitySearchService;
_timeseriesAspectService = timeseriesAspectService;
_systemMetadataService = systemMetadataService;
_searchDocumentTransformer = searchDocumentTransformer;
_entityIndexBuilders = entityIndexBuilders;
this.idHashAlgo = idHashAlgo;
}

@Override
Expand Down Expand Up @@ -601,7 +604,9 @@ private void updateTimeseriesFields(
SystemMetadata systemMetadata) {
Map<String, JsonNode> documents;
try {
documents = TimeseriesAspectTransformer.transform(urn, aspect, aspectSpec, systemMetadata);
documents =
TimeseriesAspectTransformer.transform(
urn, aspect, aspectSpec, systemMetadata, idHashAlgo);
} catch (JsonProcessingException e) {
log.error("Failed to generate timeseries document from aspect: {}", e.toString());
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ public static Map<String, JsonNode> transform(
@Nonnull final Urn urn,
@Nonnull final RecordTemplate timeseriesAspect,
@Nonnull final AspectSpec aspectSpec,
@Nullable final SystemMetadata systemMetadata)
@Nullable final SystemMetadata systemMetadata,
@Nonnull final String idHashAlgo)
throws JsonProcessingException {
ObjectNode commonDocument = getCommonDocument(urn, timeseriesAspect, systemMetadata);
Map<String, JsonNode> finalDocuments = new HashMap<>();
Expand All @@ -74,7 +75,7 @@ public static Map<String, JsonNode> transform(
final Map<TimeseriesFieldSpec, List<Object>> timeseriesFieldValueMap =
FieldExtractor.extractFields(timeseriesAspect, aspectSpec.getTimeseriesFieldSpecs());
timeseriesFieldValueMap.forEach((k, v) -> setTimeseriesField(document, k, v));
finalDocuments.put(getDocId(document, null), document);
finalDocuments.put(getDocId(document, null, idHashAlgo), document);

// Create new rows for the member collection fields.
final Map<TimeseriesFieldCollectionSpec, List<Object>> timeseriesFieldCollectionValueMap =
Expand All @@ -83,7 +84,7 @@ public static Map<String, JsonNode> transform(
timeseriesFieldCollectionValueMap.forEach(
(key, values) ->
finalDocuments.putAll(
getTimeseriesFieldCollectionDocuments(key, values, commonDocument)));
getTimeseriesFieldCollectionDocuments(key, values, commonDocument, idHashAlgo)));
return finalDocuments;
}

Expand Down Expand Up @@ -216,12 +217,13 @@ private static void setTimeseriesField(
private static Map<String, JsonNode> getTimeseriesFieldCollectionDocuments(
final TimeseriesFieldCollectionSpec fieldSpec,
final List<Object> values,
final ObjectNode commonDocument) {
final ObjectNode commonDocument,
@Nonnull final String idHashAlgo) {
return values.stream()
.map(value -> getTimeseriesFieldCollectionDocument(fieldSpec, value, commonDocument))
.collect(
Collectors.toMap(
keyDocPair -> getDocId(keyDocPair.getSecond(), keyDocPair.getFirst()),
keyDocPair -> getDocId(keyDocPair.getSecond(), keyDocPair.getFirst(), idHashAlgo),
Pair::getSecond));
}

Expand Down Expand Up @@ -257,9 +259,9 @@ private static Pair<String, ObjectNode> getTimeseriesFieldCollectionDocument(
finalDocument);
}

private static String getDocId(@Nonnull JsonNode document, String collectionId)
private static String getDocId(
@Nonnull JsonNode document, String collectionId, @Nonnull String idHashAlgo)
throws IllegalArgumentException {
String hashAlgo = System.getenv("ELASTIC_ID_HASH_ALGO");
String docId = document.get(MappingsBuilder.TIMESTAMP_MILLIS_FIELD).toString();
JsonNode eventGranularity = document.get(MappingsBuilder.EVENT_GRANULARITY);
if (eventGranularity != null) {
Expand All @@ -278,9 +280,9 @@ private static String getDocId(@Nonnull JsonNode document, String collectionId)
docId += partitionSpec.toString();
}

if (hashAlgo.equalsIgnoreCase("SHA-256")) {
if (idHashAlgo.equalsIgnoreCase("SHA-256")) {
return DigestUtils.sha256Hex(docId);
} else if (hashAlgo.equalsIgnoreCase("MD5")) {
} else if (idHashAlgo.equalsIgnoreCase("MD5")) {
return DigestUtils.md5Hex(docId);
}
throw new IllegalArgumentException("Hash function not handled !");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public abstract class SearchGraphServiceTestBase extends GraphServiceTestBase {
@Nonnull
protected abstract ESIndexBuilder getIndexBuilder();

private final IndexConvention _indexConvention = IndexConventionImpl.NO_PREFIX;
private final IndexConvention _indexConvention = IndexConventionImpl.noPrefix("MD5");
private final String _indexName = _indexConvention.getIndexName(INDEX_NAME);
private ElasticSearchGraphService _client;

Expand Down Expand Up @@ -108,7 +108,8 @@ private ElasticSearchGraphService buildService(boolean enableMultiPathSearch) {
_indexConvention,
writeDAO,
readDAO,
getIndexBuilder());
getIndexBuilder(),
"MD5");
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public void setup() throws RemoteInvocationException, URISyntaxException {
operationContext =
TestOperationContexts.systemContextNoSearchAuthorization(
new SnapshotEntityRegistry(new Snapshot()),
new IndexConventionImpl("lineage_search_service_test"))
new IndexConventionImpl("lineage_search_service_test", "MD5"))
.asSession(RequestContext.TEST, Authorizer.EMPTY, TestOperationContexts.TEST_USER_AUTH);
settingsBuilder = new SettingsBuilder(null);
elasticSearchService = buildEntitySearchService();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public void setup() throws RemoteInvocationException, URISyntaxException {
operationContext =
TestOperationContexts.systemContextNoSearchAuthorization(
new SnapshotEntityRegistry(new Snapshot()),
new IndexConventionImpl("search_service_test"))
new IndexConventionImpl("search_service_test", "MD5"))
.asSession(RequestContext.TEST, Authorizer.EMPTY, TestOperationContexts.TEST_USER_AUTH);

settingsBuilder = new SettingsBuilder(null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,8 @@ public abstract class TestEntityTestBase extends AbstractTestNGSpringContextTest
public void setup() {
opContext =
TestOperationContexts.systemContextNoSearchAuthorization(
new SnapshotEntityRegistry(new Snapshot()), new IndexConventionImpl("es_service_test"));
new SnapshotEntityRegistry(new Snapshot()),
new IndexConventionImpl("es_service_test", "MD5"));
settingsBuilder = new SettingsBuilder(null);
elasticSearchService = buildService();
elasticSearchService.reindexAll(Collections.emptySet());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public void setup() throws RemoteInvocationException, URISyntaxException {
mockClient = mock(RestHighLevelClient.class);
opContext =
TestOperationContexts.systemContextNoSearchAuthorization(
new IndexConventionImpl("es_browse_dao_test"));
new IndexConventionImpl("es_browse_dao_test", "MD5"));
browseDAO = new ESBrowseDAO(mockClient, searchConfiguration, customSearchConfiguration);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public abstract class SystemMetadataServiceTestBase extends AbstractTestNGSpring
protected abstract ESIndexBuilder getIndexBuilder();

private final IndexConvention _indexConvention =
new IndexConventionImpl("es_system_metadata_service_test");
new IndexConventionImpl("es_system_metadata_service_test", "MD5");

private ElasticSearchSystemMetadataService _client;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ public void setup() throws RemoteInvocationException, URISyntaxException {

opContext =
TestOperationContexts.systemContextNoSearchAuthorization(
entityRegistry, new IndexConventionImpl("es_timeseries_aspect_service_test"));
entityRegistry, new IndexConventionImpl("es_timeseries_aspect_service_test", "MD5"));

elasticSearchTimeseriesAspectService = buildService();
elasticSearchTimeseriesAspectService.reindexAll(Collections.emptySet());
Expand All @@ -152,7 +152,7 @@ private ElasticSearchTimeseriesAspectService buildService() {

private void upsertDocument(TestEntityProfile dp, Urn urn) throws JsonProcessingException {
Map<String, JsonNode> documents =
TimeseriesAspectTransformer.transform(urn, dp, aspectSpec, null);
TimeseriesAspectTransformer.transform(urn, dp, aspectSpec, null, "MD5");
assertEquals(documents.size(), 3);
documents.forEach(
(key, value) ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,12 +86,12 @@ protected String longTailIndexPrefix() {

@Bean(name = "sampleDataIndexConvention")
protected IndexConvention indexConvention(@Qualifier("sampleDataPrefix") String prefix) {
return new IndexConventionImpl(prefix);
return new IndexConventionImpl(prefix, "MD5");
}

@Bean(name = "longTailIndexConvention")
protected IndexConvention longTailIndexConvention(@Qualifier("longTailPrefix") String prefix) {
return new IndexConventionImpl(prefix);
return new IndexConventionImpl(prefix, "MD5");
}

@Bean(name = "sampleDataFixtureName")
Expand Down
Loading
Loading