Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[MVP2023-255] feat: used search_after instead of scroll API #1224

Merged
merged 1 commit into from
May 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -783,7 +783,7 @@ private List<Map<String, String>> getOpenAndExcepmtedAnnotationForRule(Map<Strin
shouldFilter);
// get all the issues for this ruleId
return ESUtils.getDataFromES(esUrl, indexName.toLowerCase(), null,
mustFilter, null, shouldFilter, fields, 0, totalDocs);
mustFilter, null, shouldFilter, fields, 0, totalDocs, "_docid");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ default List<Map<String, String>> getOpenAndExcepmtedAnnotationForRule(Map<Strin
shouldFilter);
// get all the issues for this ruleId
return ESUtils.getDataFromES(esUrl, indexName.toLowerCase(), null,
mustFilter, null, shouldFilter, fields, 0, totalDocs);
mustFilter, null, shouldFilter, fields, 0, totalDocs, "_docid");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public void populateExistingIssuesForType(Map<String, String> ruleParam) throws
shouldFilter);
// get all the issues for this ruleId
List<Map<String, String>> existingIssues = ESUtils.getDataFromES(esUrl, indexName.toLowerCase(), null,
mustFilter, mustNotFilter, shouldFilter, fields, 0, totalDocs);
mustFilter, mustNotFilter, shouldFilter, fields, 0, totalDocs, "_docid");
existingIssues.stream().forEach(obj -> {
existingIssuesMapWithAnnotationIdAsKey.put(obj.get(PacmanSdkConstants.ES_DOC_ID_KEY), obj);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,9 @@ public Map<String, IssueException> getIndividualExceptions(String resourceType)

mustFilter.put("range", dateRangeMap);

// TODO: Not querying entire exceptions here. Need to update. For now putting 10k
List<Map<String, String>> exceptions = ESUtils.getDataFromES(ESUtils.getEsUrl(), indexName, type, mustFilter,
null, null, null, 0, 20);
null, null, null, 0, 10000, "issueId");
Map<String, IssueException> individualExceptions = exceptions.stream()
.map(obj -> new IssueException(obj, ExceptionType.INDIVIDUAL))
.collect(Collectors.toMap(IssueException::getIssueId, obj -> obj, (oldval, newval) -> {
Expand Down Expand Up @@ -126,8 +127,9 @@ public Map<String, List<IssueException>> getStickyExceptions(String policyId, St
Map<String, Object> dateRangeMap = new HashMap<String, Object>();
dateRangeMap.put("expiryDate", rangeMap);
mustFilter.put("range", dateRangeMap);
// TODO: not getting entire data here, putting 10K
List<Map<String, String>> exceptions = ESUtils.getDataFromES(ESUtils.getEsUrl(), INDEX_FOR_EXCEPTIONS,
TYPE_FOR_STICKY_EXCEPTIONS, mustFilter, null, null, null, 0, 20);
TYPE_FOR_STICKY_EXCEPTIONS, mustFilter, null, null, null, 0, 10000, null);
List<IssueException> stickyExceptions = exceptions.stream()
.map(obj -> new IssueException(obj, ExceptionType.STICKY)).collect(Collectors.toList());
// clear the must filter
Expand All @@ -138,9 +140,10 @@ public Map<String, List<IssueException>> getStickyExceptions(String policyId, St
Map<String, List<IssueException>> exceptionResourceSetMap = new HashMap<>();
stickyExceptions.forEach(obj -> {
try {
// TODO: not getting entire data here, putting 10K
exemptedResources.put(obj,
ESUtils.getDataFromES(ESUtils.getEsUrl(), obj.getAssetGroup(), resourceType, mustFilter, null,
null, null, 0, 20).stream().map(resource -> resource.get(RESOURCE_ID))
null, null, 0, 10000, null).stream().map(resource -> resource.get(RESOURCE_ID))
.collect(Collectors.toList()));
} catch (Exception e) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ private ResourceOwner fetchAndBuildResourceOwnerDetails(final String resourceId)
HashMultimap<String, Object> shouldFilter = null;
try{
resourceDetails = ESUtils.getDataFromES(heimdallUrl, HEIMDALL_RESOURCE_INDEX, "",
mustFilter, Maps.newHashMap(), shouldFilter, fields, 0, 10);
mustFilter, Maps.newHashMap(), shouldFilter, fields, 0, 10, null);

if (resourceDetails.size() > 0) {
resourceOwner.setEmailId(findEmail(resourceDetails));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ public static List<Map<String, String>> getResourcesFromEs(String index, String
Long totalDocs = getTotalDocumentCountForIndexAndType(url, index, targetType, effectiveFilter, null, null);
logger.debug("total resource count" + totalDocs);
List<Map<String, String>> details = getDataFromES(url, index.toLowerCase(), targetType.toLowerCase(),
effectiveFilter, null, null, fields, 0, totalDocs);
effectiveFilter, null, null, fields, 0, totalDocs, "_docid");
return details;
}

Expand Down Expand Up @@ -426,8 +426,8 @@ public static String getIssueTypeFromAnnotation(Annotation annotation) {

@SuppressWarnings("unchecked")
public static List<Map<String, String>> getDataFromES(final String url, String dataSource, String entityType,
Map<String, Object> mustFilter, final Map<String, Object> mustNotFilter,
final HashMultimap<String, Object> shouldFilter, List<String> fields, long from, long size)
Map<String, Object> mustFilter, final Map<String, Object> mustNotFilter,
final HashMultimap<String, Object> shouldFilter, List<String> fields, long from, long size, String uniqueColumn)
throws Exception {

// if filter is not null apply filter, this can be a multi value filter
Expand All @@ -439,12 +439,9 @@ public static List<Map<String, String>> getDataFromES(final String url, String d
}

String urlToQuery = url + "/" + dataSource +
// if (!Strings.isNullOrEmpty(entityType)) {
// urlToQueryBuffer.append("/").append(entityType);
// }
"/" + "_search" + "?scroll=" + PacmanSdkConstants.ES_PAGE_SCROLL_TTL;
"/" + "_search";
logger.info("Querying ES with URL1: {}", urlToQuery);
String urlToScroll = url + "/" + "_search" + "/scroll";
String urlToPIT = url + "/" + "_search";
List<Map<String, String>> results = new ArrayList<Map<String, String>>();
// paginate for breaking the response into smaller chunks

Expand All @@ -461,34 +458,56 @@ public static List<Map<String, String>> getDataFromES(final String url, String d
}
}

Map<String, Object> requestBody = new HashMap<String, Object>();
Map<String, Object> requestBody = new HashMap<>();
requestBody.put("size", PacmanSdkConstants.ES_PAGE_SIZE);
requestBody.put(QUERY, CommonUtils.buildQuery(matchFilters, mustNotFilter, shouldFilter));
requestBody.put("_source", fields);
Gson serializer = new GsonBuilder().create();
String request = serializer.toJson(requestBody);
logger.debug("inventory query" + request);
String _scroll_id = null;
for (int index = 0; index <= (size / PacmanSdkConstants.ES_PAGE_SIZE); index++) {
String responseDetails = null;
try {
if (!Strings.isNullOrEmpty(_scroll_id)) {
request = buildScrollRequest(_scroll_id, PacmanSdkConstants.ES_PAGE_SCROLL_TTL);
urlToQuery = urlToScroll;
List<Map<String, Object>> sortList = new ArrayList<>();
Map<String, Object> sortKey = new HashMap<>();
Map<String, Object> orderMap = new HashMap<>();
orderMap.put("order", "desc");
/**
* Add unmapped_type to ignore error in case of no data available in the index
* This would also swallow error if the field also not available in the index.
* So be very careful that this uniqueColumn exists in the index
*/
orderMap.put("unmapped_type", "string");
String sortColumn = uniqueColumn == null ? "_id" : uniqueColumn + ".keyword";
sortKey.put(sortColumn, orderMap);
sortList.add(sortKey);
requestBody.put("sort", sortList);
Gson serializer = new GsonBuilder().disableHtmlEscaping().create();
try {
for (int index = 0; index <= (size / PacmanSdkConstants.ES_PAGE_SIZE); index++) {
String responseDetails;
try {
String request = serializer.toJson(requestBody);
logger.info("Querying ES with URL2: {}", urlToQuery);
logger.debug("inventory query" + request);
responseDetails = CommonUtils.doHttpPost(urlToQuery, request, new HashMap<>());
Map<String, Object> returnObj = processResponseAndSendTheSortObjBack(responseDetails, results);
requestBody.put("search_after", returnObj.get("sortArray"));

} catch (Exception e) {
logger.error("error retrieving inventory from ES", e);
throw e;
}
logger.info("Querying ES with URL2: {}", urlToQuery);
responseDetails = CommonUtils.doHttpPost(urlToQuery, request,new HashMap<>());
_scroll_id = processResponseAndSendTheScrollBack(responseDetails, results);
} catch (Exception e) {
logger.error("error retrieving inventory from ES", e);
throw e;
}

} finally {
// TODO: Delete the PIT
}
// checkDups(results);
return results;
}

private static String getPitId(String urlToQuery) {
String response = null;
response = CommonUtils.doHttpPost(urlToQuery, "", new HashMap<>());
Gson serializer = new GsonBuilder().create();
Map<String, Object> responseDetails = (Map<String, Object>) serializer.fromJson(response, Object.class);
return (String) responseDetails.get("pit_id");
}

/**
* Check dups.
*
Expand All @@ -504,14 +523,18 @@ private static void checkDups(List<Map<String, String>> results) {
/**
* Builds the scroll request.
*
* @param _scroll_id the scroll id
* @param esPageScrollTtl the es page scroll ttl
* @param _pitId the scroll id
* @param requestBody
* @return the string
*/
private static String buildScrollRequest(String _scroll_id, String esPageScrollTtl) {
Map<String, Object> requestBody = new HashMap<String, Object>();
requestBody.put("scroll", PacmanSdkConstants.ES_PAGE_SCROLL_TTL);
requestBody.put("scroll_id", _scroll_id);
private static String buildPITRequest(String _pitId, Map<String, Object> requestBody) {

if (_pitId != null) {
HashMap<String, String> pitBody = new HashMap<>();
pitBody.put("keep_alive", PacmanSdkConstants.ES_PAGE_SCROLL_TTL);
pitBody.put("id", _pitId);
requestBody.put("pit", pitBody);
}
Gson serializer = new GsonBuilder().disableHtmlEscaping().create();
return serializer.toJson(requestBody);
}
Expand All @@ -520,13 +543,14 @@ private static String buildScrollRequest(String _scroll_id, String esPageScrollT
* Process response and send the scroll back.
*
* @param responseDetails the response details
* @param results the results
* @param results the results
* @return the string
*/
private static String processResponseAndSendTheScrollBack(String responseDetails,
List<Map<String, String>> results) {
private static Map<String, Object> processResponseAndSendTheSortObjBack(String responseDetails,
List<Map<String, String>> results) {
Gson serializer = new GsonBuilder().create();
Map<String, Object> response = (Map<String, Object>) serializer.fromJson(responseDetails, Object.class);
Map<String, Object> lastHitDetail = new HashMap<>();
if (response.containsKey("hits")) {
Map<String, Object> hits = (Map<String, Object>) response.get("hits");
if (hits.containsKey("hits")) {
Expand All @@ -539,10 +563,16 @@ private static String processResponseAndSendTheScrollBack(String responseDetails
sources.put(PacmanSdkConstants.ES_DOC_ROUTING_KEY,
hitDetail.get(PacmanSdkConstants.ES_DOC_ROUTING_KEY));
results.add(CommonUtils.flatNestedMap(null, sources));
lastHitDetail = hitDetail;
}
}
}
return (String) response.get("_scroll_id");
// extract the sort array from the last hitDetail object
List<String> sortArray = (List<String>) lastHitDetail.get("sort");
Map<String, Object> returnObject = new HashMap<>();
returnObject.put("sortArray", sortArray);
// returnObject.put("pit_id", response.get("pit_id"));
return returnObject;
}

/**
Expand Down Expand Up @@ -570,7 +600,7 @@ public static Map<String, String> getDocumentForId(String index, String targetTy
filter.put("_id", _id);
List<String> fields = new ArrayList<String>();
List<Map<String, String>> details = getDataFromES(url, index.toLowerCase(), targetType.toLowerCase(), filter,
null, null, fields, 0, 100);
null, null, fields, 0, 100, null);
if (details != null && !details.isEmpty()) {
return details.get(0);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public void setup(){
PowerMockito.when(ReflectionUtils.findFixClass(anyString())).thenReturn(cl);
PowerMockito.when(ReflectionUtils.findAssociatedMethod(anyObject(), anyString())).thenReturn(method);
PowerMockito.when(ESUtils.getTotalDocumentCountForIndexAndType(anyString(), anyString(), anyString(), anyMap(), anyMap(), any(HashMultimap.class))).thenReturn(10L);
PowerMockito.when(ESUtils.getDataFromES(anyString(), anyString(), anyString(), anyMap(), anyMap(), any(HashMultimap.class), anyList(), anyLong(), anyLong())).thenReturn(new ArrayList());
PowerMockito.when(ESUtils.getDataFromES(anyString(), anyString(), anyString(), anyMap(), anyMap(), any(HashMultimap.class), anyList(), anyLong(), anyLong(), anyString())).thenReturn(new ArrayList());
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public void testGetIndividualExceptions() throws Exception{
PowerMockito.when(ESUtils.getEsUrl()).thenReturn("");
try {

PowerMockito.when(ESUtils.getDataFromES(anyString(), anyString(), anyString(), anyMap(), anyMap(), any(HashMultimap.class), anyList(), anyLong(), anyLong())).thenReturn(new ArrayList());
PowerMockito.when(ESUtils.getDataFromES(anyString(), anyString(), anyString(), anyMap(), anyMap(), any(HashMultimap.class), anyList(), anyLong(), anyLong(), anyString())).thenReturn(new ArrayList());
} catch (Exception e) {
}

Expand All @@ -84,7 +84,7 @@ public void testGetStickyExceptions() throws Exception{
PowerMockito.mockStatic(ESUtils.class);
PowerMockito.when(ESUtils.getEsUrl()).thenReturn("");
try {
PowerMockito.when(ESUtils.getDataFromES(anyString(), anyString(), anyString(), anyMap(), anyMap(), any(HashMultimap.class), anyList(), anyLong(), anyLong())).thenReturn(new ArrayList());
PowerMockito.when(ESUtils.getDataFromES(anyString(), anyString(), anyString(), anyMap(), anyMap(), any(HashMultimap.class), anyList(), anyLong(), anyLong(), anyString())).thenReturn(new ArrayList());
} catch (Exception e) {
}

Expand Down