Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FetchIOCService update IocStoreConfig with feedConfigId and IOC index names #1080

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.securityanalytics.model.STIX2IOC;
import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings;
import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext;
import org.opensearch.securityanalytics.threatIntel.model.DefaultIocStoreConfig;
import org.opensearch.securityanalytics.threatIntel.model.SATIFSourceConfig;

import java.io.ByteArrayOutputStream;
Expand Down Expand Up @@ -111,10 +112,11 @@ public void storeIOCs(Map<IOC, UpdateAction> actionToIOCs) {
}

public void indexIocs(List<STIX2IOC> iocs) throws IOException {
// TODO @jowg, there seems to be a bug in SATIFSourceConfigManagementService.
// downloadAndSaveIOCs is called before indexTIFSourceConfig, which means the config doesn't have an ID to use when creating the system index to store IOCs.
// Testing using SaTifSourceConfigDto.getName() instead of .getId() for now.
String feedIndexName = initFeedIndex(saTifSourceConfig.getName());
String feedIndexName = initFeedIndex(saTifSourceConfig.getId());

// Add the created index to the IocStoreConfig
((DefaultIocStoreConfig) saTifSourceConfig.getIocStoreConfig()).getIocMapStore().putIfAbsent(saTifSourceConfig.getId(), new ArrayList<>());
((DefaultIocStoreConfig) saTifSourceConfig.getIocStoreConfig()).getIocMapStore().get(saTifSourceConfig.getId()).add(feedIndexName);

List<BulkRequest> bulkRequestList = new ArrayList<>();
BulkRequest bulkRequest = new BulkRequest();
Expand Down Expand Up @@ -150,13 +152,16 @@ public void indexIocs(List<STIX2IOC> iocs) throws IOException {
long duration = Duration.between(startTime, Instant.now()).toMillis();
STIX2IOCFetchService.STIX2IOCFetchResponse output = new STIX2IOCFetchService.STIX2IOCFetchResponse(iocs, duration);
baseListener.onResponse(output);
}, baseListener::onFailure), bulkRequestList.size());
}, e -> {
log.error("Failed to index IOCs for config {}", saTifSourceConfig.getId(), e);
baseListener.onFailure(e);
}), bulkRequestList.size());

for (BulkRequest req : bulkRequestList) {
try {
StashedThreadContext.run(client, () -> client.bulk(req, bulkResponseListener));
} catch (OpenSearchException e) {
log.error("Failed to save IOCs.", e);
log.error("Failed to save IOCs for config {}", saTifSourceConfig.getId(), e);
baseListener.onFailure(e);
}
}
Expand All @@ -175,7 +180,6 @@ public static String getFeedConfigIndexName(String feedSourceConfigId) {
return IOC_INDEX_NAME_TEMPLATE.replace(IOC_FEED_ID_PLACEHOLDER, feedSourceConfigId.toLowerCase(Locale.ROOT));
}

// TODO hurneyt change ActionResponse to more specific response once it's available
public String initFeedIndex(String feedSourceConfigId) {
String feedIndexName = getFeedConfigIndexName(feedSourceConfigId);
if (!feedIndexExists(feedIndexName)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ public STIX2IOCFetchService(Client client, ClusterService clusterService) {
batchSize = clusterService.getClusterSettings().get(SecurityAnalyticsSettings.BATCH_SIZE);
}

public void fetchIocs(SATIFSourceConfig saTifSourceConfig, ActionListener<STIX2IOCFetchResponse> listener) {
public void downloadAndIndexIOCs(SATIFSourceConfig saTifSourceConfig, ActionListener<STIX2IOCFetchResponse> listener) {
S3ConnectorConfig s3ConnectorConfig = new S3ConnectorConfig(
((S3Source) saTifSourceConfig.getSource()).getBucketName(),
((S3Source) saTifSourceConfig.getSource()).getObjectKey(),
Expand All @@ -76,8 +76,20 @@ public void fetchIocs(SATIFSourceConfig saTifSourceConfig, ActionListener<STIX2I
STIX2IOCFeedStore feedStore = new STIX2IOCFeedStore(client, clusterService, saTifSourceConfig, listener);
STIX2IOCConsumer consumer = new STIX2IOCConsumer(batchSize, feedStore, UpdateType.REPLACE);

s3Connector.load(consumer);
consumer.flushIOCs();
try {
s3Connector.load(consumer);
} catch (Exception e) {
log.error("Failed to download IOCs.", e);
listener.onFailure(e);
}

// TODO consider passing listener into the flush IOC function
try {
consumer.flushIOCs();
} catch (Exception e) {
log.error("Failed to flush IOCs queue.", e);
listener.onFailure(e);
}
}

public void validateS3ConnectorConfig(S3ConnectorConfig s3ConnectorConfig) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ public class DefaultIocStoreConfig extends IocStoreConfig implements Writeable,
private static final Logger log = LogManager.getLogger(DefaultIocStoreConfig.class);
public static final String DEFAULT_FIELD = "default";
public static final String IOC_MAP = "ioc_map";

// Maps the SATIFSourceConfig ID to the list of index/alias names
private final Map<String, List<String>> iocMapStore;

public DefaultIocStoreConfig(Map<String, List<String>> iocMapStore) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,6 @@ public void testRetrieveIOCsSuccessfully() throws IOException, InterruptedExcept


// Wait for feed to execute
// TODO @jowg, last_updated_time is null in responseBody, but last_refreshed_time is present.
// Can you clarify which should be used here?
String firstUpdatedTime = (String) ((Map<String, Object>)responseBody.get("tif_config")).get("last_refreshed_time");
waitUntil(() -> {
try {
Expand All @@ -376,10 +374,7 @@ public void testRetrieveIOCsSuccessfully() throws IOException, InterruptedExcept
}, 240, TimeUnit.SECONDS);

// Confirm IOCs were ingested to system index for the feed
// TODO @jowg, there seems to be a bug in SATIFSourceConfigManagementService.
// downloadAndSaveIOCs is called before indexTIFSourceConfig, which means the config doesn't have an ID to use when creating the system index to store IOCs.
// Testing using saTifSourceConfigDto.getName() instead of .getId() for now.
String indexName = STIX2IOCFeedStore.getFeedConfigIndexName(saTifSourceConfigDto.getName());
String indexName = STIX2IOCFeedStore.getFeedConfigIndexName(SaTifSourceConfigDto.getId());
String request = "{\n" +
" \"query\" : {\n" +
" \"match_all\":{\n" +
Expand Down
Loading