Skip to content

Commit

Permalink
Support for multiple indices in detector input (#336) (#357)
Browse files Browse the repository at this point in the history
Signed-off-by: Petar Dzepina <petar.dzepina@gmail.com>
  • Loading branch information
opensearch-trigger-bot[bot] authored Feb 22, 2023
1 parent 2323667 commit 163d7ce
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -233,10 +233,10 @@ private void createMonitorFromQueries(String index, List<Pair<String, Rule>> rul
List<IndexMonitorRequest> monitorRequests = new ArrayList<>();

if (!docLevelRules.isEmpty()) {
monitorRequests.add(createDocLevelMonitorRequest(Pair.of(index, docLevelRules), detector, refreshPolicy, Monitor.NO_ID, Method.POST));
monitorRequests.add(createDocLevelMonitorRequest(docLevelRules, detector, refreshPolicy, Monitor.NO_ID, Method.POST));
}
if (!bucketLevelRules.isEmpty()) {
monitorRequests.addAll(buildBucketLevelMonitorRequests(Pair.of(index, bucketLevelRules), detector, refreshPolicy, Monitor.NO_ID, Method.POST));
monitorRequests.addAll(buildBucketLevelMonitorRequests(bucketLevelRules, detector, refreshPolicy, Monitor.NO_ID, Method.POST));
}
// Do nothing if detector doesn't have any monitor
if (monitorRequests.isEmpty()){
Expand Down Expand Up @@ -303,15 +303,13 @@ private void updateMonitorFromQueries(String index, List<Pair<String, Rule>> rul
if (monitorPerRule.containsKey(rule.getId())) {
String monitorId = monitorPerRule.get(rule.getId());
monitorsToBeUpdated.add(createBucketLevelMonitorRequest(query.getRight(),
index,
detector,
refreshPolicy,
monitorId,
Method.PUT,
queryBackendMap.get(rule.getCategory())));
} else {
monitorsToBeAdded.add(createBucketLevelMonitorRequest(query.getRight(),
index,
detector,
refreshPolicy,
Monitor.NO_ID,
Expand All @@ -328,9 +326,9 @@ private void updateMonitorFromQueries(String index, List<Pair<String, Rule>> rul
// Process doc level monitors
if (!docLevelRules.isEmpty()) {
if (detector.getDocLevelMonitorId() == null) {
monitorsToBeAdded.add(createDocLevelMonitorRequest(Pair.of(index, docLevelRules), detector, refreshPolicy, Monitor.NO_ID, Method.POST));
monitorsToBeAdded.add(createDocLevelMonitorRequest(docLevelRules, detector, refreshPolicy, Monitor.NO_ID, Method.POST));
} else {
monitorsToBeUpdated.add(createDocLevelMonitorRequest(Pair.of(index, docLevelRules), detector, refreshPolicy, detector.getDocLevelMonitorId(), Method.PUT));
monitorsToBeUpdated.add(createDocLevelMonitorRequest(docLevelRules, detector, refreshPolicy, detector.getDocLevelMonitorId(), Method.PUT));
}
}

Expand Down Expand Up @@ -394,12 +392,12 @@ private void updateAlertingMonitors(
}, listener::onFailure);
}

private IndexMonitorRequest createDocLevelMonitorRequest(Pair<String, List<Pair<String, Rule>>> logIndexToQueries, Detector detector, WriteRequest.RefreshPolicy refreshPolicy, String monitorId, RestRequest.Method restMethod) {
private IndexMonitorRequest createDocLevelMonitorRequest(List<Pair<String, Rule>> queries, Detector detector, WriteRequest.RefreshPolicy refreshPolicy, String monitorId, RestRequest.Method restMethod) {
List<DocLevelMonitorInput> docLevelMonitorInputs = new ArrayList<>();

List<DocLevelQuery> docLevelQueries = new ArrayList<>();

for (Pair<String, Rule> query: logIndexToQueries.getRight()) {
for (Pair<String, Rule> query: queries) {
String id = query.getLeft();

Rule rule = query.getRight();
Expand All @@ -415,7 +413,7 @@ private IndexMonitorRequest createDocLevelMonitorRequest(Pair<String, List<Pair<
DocLevelQuery docLevelQuery = new DocLevelQuery(id, name, actualQuery, tags);
docLevelQueries.add(docLevelQuery);
}
DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput(detector.getName(), List.of(logIndexToQueries.getKey()), docLevelQueries);
DocLevelMonitorInput docLevelMonitorInput = new DocLevelMonitorInput(detector.getName(), detector.getInputs().get(0).getIndices(), docLevelQueries);
docLevelMonitorInputs.add(docLevelMonitorInput);

List<DocumentLevelTrigger> triggers = new ArrayList<>();
Expand Down Expand Up @@ -445,8 +443,8 @@ private IndexMonitorRequest createDocLevelMonitorRequest(Pair<String, List<Pair<
return new IndexMonitorRequest(monitorId, SequenceNumbers.UNASSIGNED_SEQ_NO, SequenceNumbers.UNASSIGNED_PRIMARY_TERM, refreshPolicy, restMethod, monitor, null);
}

private List<IndexMonitorRequest> buildBucketLevelMonitorRequests(Pair<String, List<Pair<String, Rule>>> logIndexToQueries, Detector detector, WriteRequest.RefreshPolicy refreshPolicy, String monitorId, RestRequest.Method restMethod) throws IOException, SigmaError {
List<String> ruleCategories = logIndexToQueries.getRight().stream().map(Pair::getRight).map(Rule::getCategory).distinct().collect(
private List<IndexMonitorRequest> buildBucketLevelMonitorRequests(List<Pair<String, Rule>> queries, Detector detector, WriteRequest.RefreshPolicy refreshPolicy, String monitorId, RestRequest.Method restMethod) throws IOException, SigmaError {
List<String> ruleCategories = queries.stream().map(Pair::getRight).map(Rule::getCategory).distinct().collect(
Collectors.toList());
Map<String, QueryBackend> queryBackendMap = new HashMap<>();

Expand All @@ -456,14 +454,13 @@ private List<IndexMonitorRequest> buildBucketLevelMonitorRequests(Pair<String, L

List<IndexMonitorRequest> monitorRequests = new ArrayList<>();

for (Pair<String, Rule> query: logIndexToQueries.getRight()) {
for (Pair<String, Rule> query: queries) {
Rule rule = query.getRight();

// Creating bucket level monitor per each aggregation rule
if (rule.getAggregationQueries() != null){
monitorRequests.add(createBucketLevelMonitorRequest(
query.getRight(),
logIndexToQueries.getLeft(),
detector,
refreshPolicy,
Monitor.NO_ID,
Expand All @@ -476,13 +473,15 @@ private List<IndexMonitorRequest> buildBucketLevelMonitorRequests(Pair<String, L

private IndexMonitorRequest createBucketLevelMonitorRequest(
Rule rule,
String index,
Detector detector,
WriteRequest.RefreshPolicy refreshPolicy,
String monitorId,
RestRequest.Method restMethod,
QueryBackend queryBackend
) throws SigmaError {

List<String> indices = detector.getInputs().get(0).getIndices();

AggregationQueries aggregationQueries = queryBackend.convertAggregation(rule.getAggregationItemsFromRule().get(0));

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder()
Expand All @@ -491,10 +490,11 @@ private IndexMonitorRequest createBucketLevelMonitorRequest(
// Build query string filter
.query(QueryBuilders.queryStringQuery(rule.getQueries().get(0).getValue()))
.aggregation(aggregationQueries.getAggBuilder());
String concreteIndex = IndexUtils.getNewIndexByCreationDate( // index variable in method signature can also be an index pattern
// input index can also be an index pattern or alias so we have to resolve it to concrete index
String concreteIndex = IndexUtils.getNewIndexByCreationDate(
clusterService.state(),
indexNameExpressionResolver,
index
indices.get(0) // taking first one is fine because we expect that all indices in list share same mappings
);
try {
GetIndexMappingsResponse getIndexMappingsResponse = client.execute(
Expand Down Expand Up @@ -526,7 +526,7 @@ private IndexMonitorRequest createBucketLevelMonitorRequest(
}

List<SearchInput> bucketLevelMonitorInputs = new ArrayList<>();
bucketLevelMonitorInputs.add(new SearchInput(Arrays.asList(index), searchSourceBuilder));
bucketLevelMonitorInputs.add(new SearchInput(indices, searchSourceBuilder));

List<BucketLevelTrigger> triggers = new ArrayList<>();
BucketLevelTrigger bucketLevelTrigger = new BucketLevelTrigger(rule.getId(), rule.getTitle(), rule.getLevel(), aggregationQueries.getCondition(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,81 @@ public void testCreatingADetector() throws IOException {
Assert.assertEquals(5, noOfSigmaRuleMatches);
}

public void testCreatingADetectorWithMultipleIndices() throws IOException {
String index1 = createTestIndex("windows-1", windowsIndexMapping());
String index2 = createTestIndex("windows-2", windowsIndexMapping());
// Execute CreateMappingsAction to add alias mapping for index
Request createMappingRequest = new Request("POST", SecurityAnalyticsPlugin.MAPPER_BASE_URI);
// both req params and req body are supported
createMappingRequest.setJsonEntity(
"{ \"index_name\":\"windows*\"," +
" \"rule_topic\":\"" + randomDetectorType() + "\", " +
" \"partial\":true" +
"}"
);

Response response = client().performRequest(createMappingRequest);
assertEquals(HttpStatus.SC_OK, response.getStatusLine().getStatusCode());

Detector detector = randomDetectorWithTriggers(
getRandomPrePackagedRules(),
List.of(new DetectorTrigger(null, "test-trigger", "1", List.of(randomDetectorType()), List.of(), List.of(), List.of(), List.of())),
List.of(index1, index2)
);

Response createResponse = makeRequest(client(), "POST", SecurityAnalyticsPlugin.DETECTOR_BASE_URI, Collections.emptyMap(), toHttpEntity(detector));
Assert.assertEquals("Create detector failed", RestStatus.CREATED, restStatus(createResponse));

Map<String, Object> responseBody = asMap(createResponse);

String createdId = responseBody.get("_id").toString();
int createdVersion = Integer.parseInt(responseBody.get("_version").toString());
Assert.assertNotEquals("response is missing Id", Detector.NO_ID, createdId);
Assert.assertTrue("incorrect version", createdVersion > 0);
Assert.assertEquals("Incorrect Location header", String.format(Locale.getDefault(), "%s/%s", SecurityAnalyticsPlugin.DETECTOR_BASE_URI, createdId), createResponse.getHeader("Location"));
Assert.assertFalse(((Map<String, Object>) responseBody.get("detector")).containsKey("rule_topic_index"));
Assert.assertFalse(((Map<String, Object>) responseBody.get("detector")).containsKey("findings_index"));
Assert.assertFalse(((Map<String, Object>) responseBody.get("detector")).containsKey("alert_index"));

String detectorTypeInResponse = (String) ((Map<String, Object>)responseBody.get("detector")).get("detector_type");
Assert.assertEquals("Detector type incorrect", randomDetectorType().toLowerCase(Locale.ROOT), detectorTypeInResponse);

String request = "{\n" +
" \"query\" : {\n" +
" \"match\":{\n" +
" \"_id\": \"" + createdId + "\"\n" +
" }\n" +
" }\n" +
"}";
List<SearchHit> hits = executeSearch(Detector.DETECTORS_INDEX, request);
SearchHit hit = hits.get(0);

String monitorId = ((List<String>) ((Map<String, Object>) hit.getSourceAsMap().get("detector")).get("monitor_id")).get(0);

indexDoc(index1, "1", randomDoc());
indexDoc(index2, "1", randomDoc());

Response executeResponse = executeAlertingMonitor(monitorId, Collections.emptyMap());
Map<String, Object> executeResults = entityAsMap(executeResponse);

int noOfSigmaRuleMatches = ((List<Map<String, Object>>) ((Map<String, Object>) executeResults.get("input_results")).get("results")).get(0).size();
Assert.assertEquals(5, noOfSigmaRuleMatches);
List<Map<String, Object>> results = ((List<Map<String, Object>>) ((Map<String, Object>) executeResults.get("input_results")).get("results"));
List<Object> matchedDocs = (List<Object>) (results.get(0)).values().iterator().next();
assertTrue(matchedDocs.get(0).equals("1|windows-1"));
assertTrue(matchedDocs.get(1).equals("1|windows-2"));

// Check findings
Map<String, String> params = new HashMap<>();
params.put("detector_id", createdId);
Response getFindingsResponse = makeRequest(client(), "GET", SecurityAnalyticsPlugin.FINDINGS_BASE_URI + "/_search", params, null);
Map<String, Object> getFindingsBody = entityAsMap(getFindingsResponse);
assertNotNull(getFindingsBody);
Assert.assertEquals(2, getFindingsBody.get("total_findings"));
List<?> findings = (List<?>) getFindingsBody.get("findings");
Assert.assertEquals(findings.size(), 2);
}

public void testCreatingADetectorWithIndexNotExists() throws IOException {
Detector detector = randomDetectorWithTriggers(getRandomPrePackagedRules(), List.of(new DetectorTrigger(null, "test-trigger", "1", List.of(randomDetectorType()), List.of(), List.of(), List.of(), List.of())));

Expand Down

0 comments on commit 163d7ce

Please sign in to comment.