Skip to content

Commit

Permalink
Clean up empty IOC indices created by failed source configs (opensear…
Browse files Browse the repository at this point in the history
…ch-project#1267)

* cleanup empty iocs and lock

Signed-off-by: Joanne Wang <jowg@amazon.com>

* change action listener response

Signed-off-by: Joanne Wang <jowg@amazon.com>

---------

Signed-off-by: Joanne Wang <jowg@amazon.com>
  • Loading branch information
jowg-amazon authored Aug 23, 2024
1 parent 2e0ed56 commit 0920e47
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import org.opensearch.rest.RestRequest;
import org.opensearch.search.SearchHit;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.securityanalytics.commons.model.IOCType;
import org.opensearch.securityanalytics.model.STIX2IOC;
import org.opensearch.securityanalytics.model.STIX2IOCDto;
import org.opensearch.securityanalytics.services.STIX2IOCFetchService;
Expand Down Expand Up @@ -162,16 +161,17 @@ public void createIocAndTIFSourceConfig(
));
},
e -> {
log.error("Failed to download and save IOCs for threat intel source config [{}]", indexSaTifSourceConfigResponse.getId());
saTifSourceConfigService.deleteTIFSourceConfig(indexSaTifSourceConfigResponse, ActionListener.wrap(
log.error("Failed to download and save IOCs for threat intel source config [{}]", indexSaTifSourceConfigResponse.getId(), e);
// set isDeleted as true because we want to delete failed source configs regardless if threat intel monitor exists
deleteAllIocsAndSourceConfig(ActionListener.wrap(
deleteResponse -> {
log.debug("Successfully deleted threat intel source config [{}]", indexSaTifSourceConfigResponse.getId());
listener.onFailure(e);
}, ex -> {
log.error("Failed to delete threat intel source config [{}]", indexSaTifSourceConfigResponse.getId());
log.error("Failed to delete threat intel source config [{}]", indexSaTifSourceConfigResponse.getId(), ex);
listener.onFailure(ex);
}
));
), indexSaTifSourceConfigResponse, true);
})
);
}, e -> {
Expand Down Expand Up @@ -497,7 +497,7 @@ public void deleteTIFSourceConfig(
// Check if all threat intel monitors are deleted
saTifSourceConfigService.checkAndEnsureThreatIntelMonitorsDeleted(ActionListener.wrap(
isDeleted -> {
deleteAllIocsAndSourceConfig(saTifSourceConfigId, listener, saTifSourceConfig, isDeleted);
deleteAllIocsAndSourceConfig(listener, saTifSourceConfig, isDeleted);
}, e -> {
log.error("Failed to check if all threat intel monitors are deleted or if multiple threat intel source configs exist");
listener.onFailure(e);
Expand Down Expand Up @@ -654,11 +654,11 @@ private Integer numOfIndicesToDelete(Integer totalNumIndices, Integer totalNumIn
return 0;
}

private void deleteAllIocsAndSourceConfig(String saTifSourceConfigId, ActionListener<DeleteResponse> listener, SATIFSourceConfig saTifSourceConfig, Boolean isDeleted) {
private void deleteAllIocsAndSourceConfig(ActionListener<DeleteResponse> listener, SATIFSourceConfig saTifSourceConfig, Boolean isDeleted) {
if (isDeleted == false) {
listener.onFailure(new IllegalArgumentException("All threat intel monitors need to be deleted before deleting last threat intel source config"));
} else {
log.debug("All threat intel monitors are deleted or multiple threat intel source configs exist, can delete threat intel source config [{}]", saTifSourceConfigId);
log.debug("All threat intel monitors are deleted or multiple threat intel source configs exist, can delete threat intel source config [{}]", saTifSourceConfig.getId());
markSourceConfigAsAction(
saTifSourceConfig,
TIFJobState.DELETING,
Expand All @@ -672,16 +672,28 @@ private void deleteAllIocsAndSourceConfig(String saTifSourceConfigId, ActionList
}
saTifSourceConfigService.getClusterState(ActionListener.wrap(
clusterStateResponse -> {
Set<String> concreteIndices = SATIFSourceConfigService.getConcreteIndices(clusterStateResponse);
Set<String> concreteIndices;
if (false == iocIndexPatterns.isEmpty()) {
concreteIndices = SATIFSourceConfigService.getConcreteIndices(clusterStateResponse);
} else {
concreteIndices = new HashSet<>();
}
saTifSourceConfigService.deleteAllIocIndices(concreteIndices, false, ActionListener.wrap(
r -> {
log.debug("Successfully deleted all ioc indices");
saTifSourceConfigService.deleteTIFSourceConfig(updateSaTifSourceConfigResponse, ActionListener.wrap(
deleteResponse -> {
log.debug("Successfully deleted threat intel source config [{}]", updateSaTifSourceConfigResponse.getId());
listener.onResponse(deleteResponse);
saTifSourceConfigService.deleteJobSchedulerLockIfJobDisabled(updateSaTifSourceConfigResponse, ActionListener.wrap(
deleteLockResponse -> {
saTifSourceConfigService.deleteTIFSourceConfig(updateSaTifSourceConfigResponse, ActionListener.wrap(
deleteResponse -> {
log.debug("Successfully deleted threat intel source config [{}]", updateSaTifSourceConfigResponse.getId());
listener.onResponse(deleteResponse);
}, e -> {
log.error("Failed to delete threat intel source config [{}]", saTifSourceConfig.getId());
listener.onFailure(e);
}
));
}, e -> {
log.error("Failed to delete threat intel source config [{}]", saTifSourceConfigId);
log.error("Failed to delete threat intel job scheduler lock [{}]", saTifSourceConfig.getId());
listener.onFailure(e);
}
));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import java.util.Set;
import java.util.stream.Collectors;

import static org.opensearch.jobscheduler.spi.utils.LockService.LOCK_INDEX_NAME;
import static org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings.INDEX_TIMEOUT;
import static org.opensearch.securityanalytics.threatIntel.common.TIFJobState.AVAILABLE;
import static org.opensearch.securityanalytics.threatIntel.common.TIFJobState.REFRESHING;
Expand Down Expand Up @@ -362,7 +363,7 @@ public void deleteTIFSourceConfig(
client.delete(request, ActionListener.wrap(
deleteResponse -> {
if (deleteResponse.status().equals(RestStatus.OK)) {
log.debug("Deleted threat intel source config [{}] successfully", saTifSourceConfig.getId());
log.info("Deleted threat intel source config [{}] successfully", saTifSourceConfig.getId());
actionListener.onResponse(deleteResponse);
} else if (deleteResponse.status().equals(RestStatus.NOT_FOUND)) {
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.getDefault(), "Threat intel source config with id [{%s}] not found", saTifSourceConfig.getId()), RestStatus.NOT_FOUND)));
Expand All @@ -376,6 +377,45 @@ public void deleteTIFSourceConfig(
));
}

// Manually delete threat intel job scheduler lock if job is disabled
public void deleteJobSchedulerLockIfJobDisabled(
SATIFSourceConfig saTifSourceConfig,
final ActionListener<DeleteResponse> actionListener
) {
if (saTifSourceConfig.isEnabled()) {
actionListener.onResponse(null);
return;
}

// check to make sure the job scheduler lock index exists
if (clusterService.state().metadata().hasIndex(LOCK_INDEX_NAME) == false) {
actionListener.onResponse(null);
return;
}

String id = SecurityAnalyticsPlugin.JOB_INDEX_NAME + "-" + saTifSourceConfig.getId();
DeleteRequest request = new DeleteRequest(LOCK_INDEX_NAME, id)
.setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE)
.timeout(clusterSettings.get(INDEX_TIMEOUT));

client.delete(request, ActionListener.wrap(
deleteResponse -> {
if (deleteResponse.status().equals(RestStatus.OK)) {
log.info("Deleted threat intel job scheduler lock [{}] successfully", id);
actionListener.onResponse(deleteResponse);
} else if (deleteResponse.status().equals(RestStatus.NOT_FOUND)) {
log.info("Threat intel job scheduler lock with id [{}] not found", id);
actionListener.onResponse(deleteResponse);
} else {
actionListener.onFailure(SecurityAnalyticsException.wrap(new OpenSearchStatusException(String.format(Locale.getDefault(), "Failed to delete threat intel job scheduler lock with id [{%s}]", id), deleteResponse.status())));
}
}, e -> {
log.error("Failed to delete threat intel job scheduler lock with id [{}]", id);
actionListener.onFailure(e);
}
));
}

public void deleteAllIocIndices(Set<String> indicesToDelete, Boolean backgroundJob, ActionListener<AcknowledgedResponse> listener) {
if (indicesToDelete.isEmpty() == false) {
DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest(indicesToDelete.toArray(new String[0]));
Expand All @@ -398,6 +438,8 @@ public void deleteAllIocIndices(Set<String> indicesToDelete, Boolean backgroundJ
}
)
);
} else if (listener != null) {
listener.onResponse(new AcknowledgedResponse(true));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.junit.Before;
import org.opensearch.client.Response;
import org.opensearch.client.ResponseException;
import org.opensearch.core.rest.RestStatus;
import org.opensearch.jobscheduler.spi.schedule.IntervalSchedule;
import org.opensearch.search.SearchHit;
import org.opensearch.securityanalytics.SecurityAnalyticsPlugin;
Expand Down Expand Up @@ -734,7 +735,7 @@ public void testWithNoIOCsToDownload() {
}
}

public void testWhenBucketObjectDoesNotExist() {
public void testWhenBucketObjectDoesNotExist() throws IOException {
// Only run tests when required system params are provided
if (!canRunTests) return;

Expand Down Expand Up @@ -779,13 +780,34 @@ public void testWhenBucketObjectDoesNotExist() {
true
);

Exception exception = assertThrows(ResponseException.class, () ->
makeRequest(client(), "POST", SecurityAnalyticsPlugin.THREAT_INTEL_SOURCE_URI, Collections.emptyMap(), toHttpEntity(saTifSourceConfigDto))
);

String expectedError = "{\"error\":{\"root_cause\":[{\"type\":\"no_such_key_exception\",\"reason\":\"The specified key does not exist.";
assertTrue("Exception contains unexpected message: " + exception.getMessage(), exception.getMessage().contains(expectedError));
try {
makeRequest(client(), "POST", SecurityAnalyticsPlugin.THREAT_INTEL_SOURCE_URI, Collections.emptyMap(), toHttpEntity(saTifSourceConfigDto));
} catch (ResponseException exception) {
assertEquals(RestStatus.NOT_FOUND, restStatus(exception.getResponse()));
String expectedError = "The specified key does not exist.";
assertTrue("Exception contains unexpected message: " + exception.getMessage(), exception.getMessage().contains(expectedError));
}
}

// ensure that source config was deleted
String request = "{\n" +
" \"query\" : {\n" +
" \"match_all\":{\n" +
" }\n" +
" }\n" +
"}";
List<SearchHit> hits = executeSearch(JOB_INDEX_NAME, request);
Assert.assertEquals(0, hits.size());

// ensure that ioc indices were deleted
request = "{\n" +
" \"query\" : {\n" +
" \"match_all\":{\n" +
" }\n" +
" }\n" +
"}";
hits = executeSearch(JOB_INDEX_NAME, request);
Assert.assertEquals(0, hits.size());
}

public void testWhenRoleArnIsEmpty() throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import java.util.Locale;
import java.util.Map;

import static org.opensearch.jobscheduler.spi.utils.LockService.LOCK_INDEX_NAME;
import static org.opensearch.securityanalytics.SecurityAnalyticsPlugin.JOB_INDEX_NAME;
import static org.opensearch.securityanalytics.services.STIX2IOCFeedStore.IOC_ALL_INDEX_PATTERN;
import static org.opensearch.securityanalytics.services.STIX2IOCFeedStore.getAllIocIndexPatternById;
Expand Down Expand Up @@ -718,6 +719,10 @@ public void testDeleteIocUploadSourceConfigAndAllIocs() throws IOException {
// ensure all iocs are deleted
hits = executeSearch(IOC_ALL_INDEX_PATTERN, request);
Assert.assertEquals(0, hits.size());

// ensure that lock is deleted
hits = executeSearch(LOCK_INDEX_NAME,request);
Assert.assertEquals(0, hits.size());
}

public void testRefreshIocUploadSourceConfigFailure() throws IOException {
Expand Down

0 comments on commit 0920e47

Please sign in to comment.