From 172d58de0ec6f3a8e455a9033973b2f61fc77d87 Mon Sep 17 00:00:00 2001 From: Surya Sashank Nistala Date: Fri, 1 Mar 2024 17:28:05 -0800 Subject: [PATCH] Remove blocking calls and change threat intel feed flow to event driven (#871) * remove actionGet() and change threat intel feed flow to event driven Signed-off-by: Surya Sashank Nistala * fix javadocs Signed-off-by: Surya Sashank Nistala * revert try catch removals Signed-off-by: Surya Sashank Nistala * use action listener wrap() in detector threat intel code paths Signed-off-by: Surya Sashank Nistala * add try catch Signed-off-by: Surya Sashank Nistala --------- Signed-off-by: Surya Sashank Nistala --- .../DetectorThreatIntelService.java | 43 ++---- .../ThreatIntelFeedDataService.java | 145 +++++++----------- .../action/TransportPutTIFJobAction.java | 112 +++++++------- .../threatIntel/common/TIFLockService.java | 50 ++---- .../jobscheduler/TIFJobParameterService.java | 94 ++++++------ .../jobscheduler/TIFJobRunner.java | 132 ++++++++-------- .../TransportIndexDetectorAction.java | 4 +- .../TransportSearchDetectorAction.java | 13 +- .../SecurityAnalyticsPluginTransportIT.java | 33 ---- .../common/ThreatIntelLockServiceTests.java | 9 +- 10 files changed, 265 insertions(+), 370 deletions(-) delete mode 100644 src/test/java/org/opensearch/securityanalytics/SecurityAnalyticsPluginTransportIT.java diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/DetectorThreatIntelService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/DetectorThreatIntelService.java index df4971b66..e541ee36c 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/DetectorThreatIntelService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/DetectorThreatIntelService.java @@ -32,8 +32,6 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static org.opensearch.securityanalytics.model.Detector.DETECTORS_INDEX; @@ -121,35 +119,24 @@ public void createDocLevelQueryFromThreatIntel(List iocFieldL listener.onResponse(Collections.emptyList()); return; } - - CountDownLatch latch = new CountDownLatch(1); - threatIntelFeedDataService.getThreatIntelFeedData(new ActionListener<>() { - @Override - public void onResponse(List threatIntelFeedData) { - if (threatIntelFeedData.isEmpty()) { - listener.onResponse(Collections.emptyList()); - } else { - listener.onResponse( - createDocLevelQueriesFromThreatIntelList(iocFieldList, threatIntelFeedData, detector) - ); + threatIntelFeedDataService.getThreatIntelFeedData(ActionListener.wrap( + threatIntelFeedData -> { + if (threatIntelFeedData.isEmpty()) { + listener.onResponse(Collections.emptyList()); + } else { + listener.onResponse( + createDocLevelQueriesFromThreatIntelList(iocFieldList, threatIntelFeedData, detector) + ); + } + }, e -> { + log.error("Failed to get threat intel feeds for doc level query creation", e); + listener.onFailure(e); } - latch.countDown(); - } - - @Override - public void onFailure(Exception e) { - log.error("Failed to get threat intel feeds for doc level query creation", e); - listener.onFailure(e); - latch.countDown(); - } - }); - - latch.await(30, TimeUnit.SECONDS); - } catch (InterruptedException e) { - log.error("Failed to create doc level queries from threat intel feeds", e); + )); + } catch (Exception e) { + log.error("Failed to create doc level query from threat intel data", e); listener.onFailure(e); } - } private static String constructId(Detector detector, String iocType) { diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java index f37018ae5..b9d8aa3ea 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/ThreatIntelFeedDataService.java @@ -34,12 +34,12 @@ import org.opensearch.core.xcontent.ToXContent; import org.opensearch.core.xcontent.XContentBuilder; import org.opensearch.securityanalytics.model.ThreatIntelFeedData; +import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobAction; import org.opensearch.securityanalytics.threatIntel.action.PutTIFJobRequest; import org.opensearch.securityanalytics.threatIntel.action.ThreatIntelIndicesResponse; -import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; import org.opensearch.securityanalytics.threatIntel.common.StashedThreadContext; -import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; +import org.opensearch.securityanalytics.threatIntel.common.TIFMetadata; import org.opensearch.securityanalytics.threatIntel.jobscheduler.TIFJobParameterService; import org.opensearch.securityanalytics.util.IndexUtils; import org.opensearch.securityanalytics.util.SecurityAnalyticsException; @@ -56,7 +56,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CountDownLatch; import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.stream.Collectors; @@ -104,21 +103,13 @@ public void getThreatIntelFeedData( ActionListener> listener ) { try { - String tifdIndex = getLatestIndexByCreationDate(); if (tifdIndex == null) { createThreatIntelFeedData(listener); } else { - SearchRequest searchRequest = new SearchRequest(tifdIndex); - searchRequest.source().size(9999); //TODO: convert to scroll - String finalTifdIndex = tifdIndex; - client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { - log.error(String.format( - "Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); - listener.onFailure(e); - })); + fetchThreatIntelFeedDataFromIndex(tifdIndex, listener); } - } catch (InterruptedException e) { + } catch (Exception e) { log.error("Failed to get threat intel feed data", e); listener.onFailure(e); } @@ -150,21 +141,16 @@ public void createIndexIfNotExists(final String indexName, final ActionListener< .mapping(getIndexMapping()).timeout(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT)); StashedThreadContext.run( client, - () -> client.admin().indices().create(createIndexRequest, new ActionListener<>() { - @Override - public void onResponse(CreateIndexResponse response) { - if (response.isAcknowledged()) { - listener.onResponse(response); - } else { - onFailure(new OpenSearchStatusException("Threat intel feed index creation failed", RestStatus.INTERNAL_SERVER_ERROR)); - } - } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }) + () -> client.admin().indices().create(createIndexRequest, + ActionListener.wrap( + response -> { + if (response.isAcknowledged()) + listener.onResponse(response); + else + listener.onFailure(new OpenSearchStatusException("Threat intel feed index creation failed", RestStatus.INTERNAL_SERVER_ERROR)); + + }, listener::onFailure + )) ); } @@ -223,28 +209,20 @@ public void parseAndSaveThreatIntelFeedDataCSV( } bulkRequestList.add(bulkRequest); - GroupedActionListener bulkResponseListener = new GroupedActionListener<>(new ActionListener<>() { - @Override - public void onResponse(Collection bulkResponses) { - int idx = 0; - for (BulkResponse response: bulkResponses) { - BulkRequest request = bulkRequestList.get(idx); - if (response.hasFailures()) { - throw new OpenSearchException( - "error occurred while ingesting threat intel feed data in {} with an error {}", - StringUtils.join(request.getIndices()), - response.buildFailureMessage() - ); - } + GroupedActionListener bulkResponseListener = new GroupedActionListener<>(ActionListener.wrap(bulkResponses -> { + int idx = 0; + for (BulkResponse response : bulkResponses) { + BulkRequest request = bulkRequestList.get(idx); + if (response.hasFailures()) { + throw new OpenSearchException( + "error occurred while ingesting threat intel feed data in {} with an error {}", + StringUtils.join(request.getIndices()), + response.buildFailureMessage() + ); } - listener.onResponse(new ThreatIntelIndicesResponse(true, List.of(indexName))); } - - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }, bulkRequestList.size()); + listener.onResponse(new ThreatIntelIndicesResponse(true, List.of(indexName))); + }, listener::onFailure), bulkRequestList.size()); for (int i = 0; i < bulkRequestList.size(); ++i) { saveTifds(bulkRequestList.get(i), timeout, bulkResponseListener); @@ -291,52 +269,47 @@ public void deleteThreatIntelDataIndex(final List indices) { .prepareDelete(indices.toArray(new String[0])) .setIndicesOptions(IndicesOptions.LENIENT_EXPAND_OPEN_CLOSED_HIDDEN) .setTimeout(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT)) - .execute(new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse response) { - if (response.isAcknowledged() == false) { - onFailure(new OpenSearchException("failed to delete data[{}]", String.join(",", indices))); - } - } - - @Override - public void onFailure(Exception e) { - log.error("unknown exception:", e); - } - }) + .execute(ActionListener.wrap( + response -> { + if (response.isAcknowledged() == false) { + log.error(new OpenSearchException("failed to delete threat intel feed index[{}]", + String.join(",", indices))); + } + }, e -> log.error("failed to delete threat intel feed index [{}]", e) + )) ); } - private void createThreatIntelFeedData(ActionListener> listener) throws InterruptedException { - CountDownLatch countDownLatch = new CountDownLatch(1); + private void createThreatIntelFeedData(ActionListener> listener) { client.execute( PutTIFJobAction.INSTANCE, new PutTIFJobRequest("feed_updater", clusterSettings.get(SecurityAnalyticsSettings.TIF_UPDATE_INTERVAL)), - new ActionListener<>() { - @Override - public void onResponse(AcknowledgedResponse acknowledgedResponse) { - log.debug("Acknowledged threat intel feed updater job created"); - countDownLatch.countDown(); - String tifdIndex = getLatestIndexByCreationDate(); - - SearchRequest searchRequest = new SearchRequest(tifdIndex); - searchRequest.source().size(9999); //TODO: convert to scroll - String finalTifdIndex = tifdIndex; - client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { - log.error(String.format( - "Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); + ActionListener.wrap( + r -> { + if (false == r.isAcknowledged()) { + listener.onFailure(new Exception("Failed to acknowledge Put Tif job action")); + return; + } + log.debug("Acknowledged threat intel feed updater job created"); + String tifdIndex = getLatestIndexByCreationDate(); + fetchThreatIntelFeedDataFromIndex(tifdIndex, listener); + }, e -> { + log.debug("Failed to create threat intel feed updater job", e); listener.onFailure(e); - })); - } - - @Override - public void onFailure(Exception e) { - log.debug("Failed to create threat intel feed updater job", e); - countDownLatch.countDown(); - } - } + } + ) ); - countDownLatch.await(); + } + + private void fetchThreatIntelFeedDataFromIndex(String tifdIndex, ActionListener> listener) { + SearchRequest searchRequest = new SearchRequest(tifdIndex); + searchRequest.source().size(9999); //TODO: convert to scroll + String finalTifdIndex = tifdIndex; + client.search(searchRequest, ActionListener.wrap(r -> listener.onResponse(ThreatIntelFeedDataUtils.getTifdList(r, xContentRegistry)), e -> { + log.error(String.format( + "Failed to fetch threat intel feed data from system index %s", finalTifdIndex), e); + listener.onFailure(e); + })); } private String getIndexMapping() { diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java index 393a0f102..a50beda35 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/action/TransportPutTIFJobAction.java @@ -41,7 +41,6 @@ public class TransportPutTIFJobAction extends HandledTransportAction listener) { - lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { - if (lock == null) { - listener.onFailure( - new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") - ); - log.error("another processor is a lock, BAD_REQUEST error", RestStatus.BAD_REQUEST); - return; - } - try { - internalDoExecute(request, lock, listener); - } catch (Exception e) { - lockService.releaseLock(lock); - listener.onFailure(e); - log.error("listener failed when executing", e); - } - }, exception -> { - listener.onFailure(exception); - log.error("execution failed", exception); - })); + try { + lockService.acquireLock(request.getName(), LOCK_DURATION_IN_SECONDS, ActionListener.wrap(lock -> { + if (lock == null) { + listener.onFailure( + new ConcurrentModificationException("another processor is holding a lock on the resource. Try again later") + ); + log.error("another processor is a lock, BAD_REQUEST error", RestStatus.BAD_REQUEST); + return; + } + try { + internalDoExecute(request, lock, listener); + } catch (Exception e) { + lockService.releaseLock(lock); + listener.onFailure(e); + log.error("listener failed when executing", e); + } + }, exception -> { + listener.onFailure(exception); + log.error("execution failed", exception); + })); + } catch (Exception e) { + log.error("Failed to acquire lock for job", e); + listener.onFailure(e); + } } /** @@ -103,16 +106,21 @@ protected void internalDoExecute( final LockModel lock, final ActionListener listener ) { - StepListener createIndexStep = new StepListener<>(); - tifJobParameterService.createJobIndexIfNotExists(createIndexStep); - createIndexStep.whenComplete(v -> { - TIFJobParameter tifJobParameter = TIFJobParameter.Builder.build(request); - tifJobParameterService.saveTIFJobParameter(tifJobParameter, postIndexingTifJobParameter(tifJobParameter, lock, listener)); + StepListener createIndexStepListener = new StepListener<>(); + createIndexStepListener.whenComplete(v -> { + try { + TIFJobParameter tifJobParameter = TIFJobParameter.Builder.build(request); + tifJobParameterService.saveTIFJobParameter(tifJobParameter, postIndexingTifJobParameter(tifJobParameter, lock, listener)); + } catch (Exception e) { + listener.onFailure(e); + } }, exception -> { lockService.releaseLock(lock); log.error("failed to release lock", exception); listener.onFailure(exception); }); + tifJobParameterService.createJobIndexIfNotExists(createIndexStepListener); + } /** @@ -124,40 +132,30 @@ protected ActionListener postIndexingTifJobParameter( final LockModel lock, final ActionListener listener ) { - return new ActionListener<>() { - @Override - public void onResponse(final IndexResponse indexResponse) { - AtomicReference lockReference = new AtomicReference<>(lock); - createThreatIntelFeedData(tifJobParameter, lockService.getRenewLockRunnable(lockReference), new ActionListener<>() { - @Override - public void onResponse(ThreatIntelIndicesResponse threatIntelIndicesResponse) { - if (threatIntelIndicesResponse.isAcknowledged()) { - lockService.releaseLock(lockReference.get()); - listener.onResponse(new AcknowledgedResponse(true)); - } else { - onFailure(new OpenSearchStatusException("creation of threat intel feed data failed", RestStatus.INTERNAL_SERVER_ERROR)); - } - } - - @Override - public void onFailure(Exception e) { + return ActionListener.wrap( + indexResponse -> { + AtomicReference lockReference = new AtomicReference<>(lock); + createThreatIntelFeedData(tifJobParameter, lockService.getRenewLockRunnable(lockReference), ActionListener.wrap( + threatIntelIndicesResponse -> { + if (threatIntelIndicesResponse.isAcknowledged()) { + lockService.releaseLock(lockReference.get()); + listener.onResponse(new AcknowledgedResponse(true)); + } else { + listener.onFailure(new OpenSearchStatusException("creation of threat intel feed data failed", RestStatus.INTERNAL_SERVER_ERROR)); + } + }, listener::onFailure + )); + }, e -> { + lockService.releaseLock(lock); + if (e instanceof VersionConflictEngineException) { + log.error("tifJobParameter already exists"); + listener.onFailure(new ResourceAlreadyExistsException("tifJobParameter [{}] already exists", tifJobParameter.getName())); + } else { + log.error("Internal server error"); listener.onFailure(e); } - }); - } - - @Override - public void onFailure(final Exception e) { - lockService.releaseLock(lock); - if (e instanceof VersionConflictEngineException) { - log.error("tifJobParameter already exists"); - listener.onFailure(new ResourceAlreadyExistsException("tifJobParameter [{}] already exists", tifJobParameter.getName())); - } else { - log.error("Internal server error"); - listener.onFailure(e); } - } - }; + ); } protected void createThreatIntelFeedData(final TIFJobParameter tifJobParameter, final Runnable renewLock, final ActionListener listener) { diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/common/TIFLockService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/common/TIFLockService.java index 7ec4e94f3..98abf040a 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/common/TIFLockService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/common/TIFLockService.java @@ -5,18 +5,8 @@ package org.opensearch.securityanalytics.threatIntel.common; -import static org.opensearch.securityanalytics.SecurityAnalyticsPlugin.JOB_INDEX_NAME; - - -import java.time.Instant; -import java.util.Optional; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicReference; - import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; - import org.opensearch.OpenSearchException; import org.opensearch.client.Client; import org.opensearch.cluster.service.ClusterService; @@ -25,6 +15,13 @@ import org.opensearch.jobscheduler.spi.utils.LockService; import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; +import java.time.Instant; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import static org.opensearch.securityanalytics.SecurityAnalyticsPlugin.JOB_INDEX_NAME; + /** * A wrapper of job scheduler's lock service */ @@ -48,52 +45,27 @@ public TIFLockService(final ClusterService clusterService, final Client client) this.lockService = new LockService(client, clusterService); } - /** - * Wrapper method of LockService#acquireLockWithId - * - * tif job uses its name as doc id in job scheduler. Therefore, we can use tif job name to acquire - * a lock on a tif job. - * - * @param tifJobName tifJobName to acquire lock on - * @param lockDurationSeconds the lock duration in seconds - * @param listener the listener - */ - public void acquireLock(final String tifJobName, final Long lockDurationSeconds, final ActionListener listener) { - lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, tifJobName, listener); - } - /** * Synchronous method of #acquireLock * * @param tifJobName tifJobName to acquire lock on * @param lockDurationSeconds the lock duration in seconds - * @return lock model */ - public Optional acquireLock(final String tifJobName, final Long lockDurationSeconds) { + public void acquireLock(final String tifJobName, final Long lockDurationSeconds, ActionListener listener) { AtomicReference lockReference = new AtomicReference(); - CountDownLatch countDownLatch = new CountDownLatch(1); lockService.acquireLockWithId(JOB_INDEX_NAME, lockDurationSeconds, tifJobName, new ActionListener<>() { @Override public void onResponse(final LockModel lockModel) { lockReference.set(lockModel); - countDownLatch.countDown(); + listener.onResponse(lockReference.get()); } @Override public void onFailure(final Exception e) { - lockReference.set(null); - countDownLatch.countDown(); - log.error("aquiring lock failed", e); + log.error("Failed to acquire lock for tif job " + tifJobName, e); + listener.onFailure(e); } }); - - try { - countDownLatch.await(clusterService.getClusterSettings().get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT).getSeconds(), TimeUnit.SECONDS); - return Optional.ofNullable(lockReference.get()); - } catch (InterruptedException e) { - log.error("Waiting for the count down latch failed", e); - return Optional.empty(); - } } /** diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java index de9bb5365..55387cb35 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobParameterService.java @@ -9,6 +9,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.OpenSearchStatusException; import org.opensearch.ResourceAlreadyExistsException; +import org.opensearch.ResourceNotFoundException; import org.opensearch.action.DocWriteRequest; import org.opensearch.action.StepListener; import org.opensearch.action.admin.indices.create.CreateIndexRequest; @@ -84,6 +85,7 @@ public void onFailure(final Exception e) { stepListener.onResponse(null); return; } + log.error("Failed to create security analytics job index", e); stepListener.onFailure(e); } })); @@ -104,82 +106,72 @@ private String getIndexMapping() { /** * Update jobSchedulerParameter in an index {@code TIFJobExtension.JOB_INDEX_NAME} + * * @param jobSchedulerParameter the jobSchedulerParameter */ public void updateJobSchedulerParameter(final TIFJobParameter jobSchedulerParameter, final ActionListener listener) { jobSchedulerParameter.setLastUpdateTime(Instant.now()); StashedThreadContext.run(client, () -> { try { - if (listener != null) { - client.prepareIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) - .setId(jobSchedulerParameter.getName()) - .setOpType(DocWriteRequest.OpType.INDEX) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .setSource(jobSchedulerParameter.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) - .execute(new ActionListener<>() { - @Override - public void onResponse(IndexResponse indexResponse) { - if (indexResponse.status().getStatus() >= 200 && indexResponse.status().getStatus() < 300) { - listener.onResponse(new ThreatIntelIndicesResponse(true, jobSchedulerParameter.getIndices())); - } else { - listener.onFailure(new OpenSearchStatusException("update of job scheduler parameter failed", RestStatus.INTERNAL_SERVER_ERROR)); - } + client.prepareIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) + .setId(jobSchedulerParameter.getName()) + .setOpType(DocWriteRequest.OpType.INDEX) + .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) + .setSource(jobSchedulerParameter.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) + .execute(new ActionListener<>() { + @Override + public void onResponse(IndexResponse indexResponse) { + if (indexResponse.status().getStatus() >= 200 && indexResponse.status().getStatus() < 300) { + listener.onResponse(new ThreatIntelIndicesResponse(true, jobSchedulerParameter.getIndices())); + } else { + listener.onFailure(new OpenSearchStatusException("update of job scheduler parameter failed", RestStatus.INTERNAL_SERVER_ERROR)); } + } - @Override - public void onFailure(Exception e) { - listener.onFailure(e); - } - }); - } else { - client.prepareIndex(SecurityAnalyticsPlugin.JOB_INDEX_NAME) - .setId(jobSchedulerParameter.getName()) - .setOpType(DocWriteRequest.OpType.INDEX) - .setRefreshPolicy(WriteRequest.RefreshPolicy.IMMEDIATE) - .setSource(jobSchedulerParameter.toXContent(XContentFactory.jsonBuilder(), ToXContent.EMPTY_PARAMS)) - .execute().actionGet(); - } + @Override + public void onFailure(Exception e) { + listener.onFailure(e); + } + }); } catch (IOException e) { - throw new SecurityAnalyticsException("Runtime exception", RestStatus.INTERNAL_SERVER_ERROR, e); + log.error("failed to update job scheduler param for tif job", e); + listener.onFailure(e); } }); } /** * Get tif job from an index {@code TIFJobExtension.JOB_INDEX_NAME} + * * @param name the name of a tif job - * @return tif job - * @throws IOException exception */ - public TIFJobParameter getJobParameter(final String name) throws IOException { + public void getJobParameter(final String name, ActionListener listener) { GetRequest request = new GetRequest(SecurityAnalyticsPlugin.JOB_INDEX_NAME, name); - GetResponse response; - try { - response = StashedThreadContext.run(client, () -> client.get(request).actionGet(clusterSettings.get(SecurityAnalyticsSettings.THREAT_INTEL_TIMEOUT))); - if (response.isExists() == false) { - log.error("TIF job[{}] does not exist in an index[{}]", name, SecurityAnalyticsPlugin.JOB_INDEX_NAME); - return null; - } - } catch (IndexNotFoundException e) { - log.error("Index[{}] is not found", SecurityAnalyticsPlugin.JOB_INDEX_NAME); - return null; - } - - XContentParser parser = XContentHelper.createParser( - NamedXContentRegistry.EMPTY, - LoggingDeprecationHandler.INSTANCE, - response.getSourceAsBytesRef() - ); - return TIFJobParameter.PARSER.parse(parser, null); + StashedThreadContext.run(client, () -> client.get(request, ActionListener.wrap( + response -> { + if (response.isExists() == false) { + log.error("TIF job[{}] does not exist in an index[{}]", name, SecurityAnalyticsPlugin.JOB_INDEX_NAME); + listener.onFailure(new ResourceNotFoundException("name")); + } + XContentParser parser = XContentHelper.createParser( + NamedXContentRegistry.EMPTY, + LoggingDeprecationHandler.INSTANCE, + response.getSourceAsBytesRef() + ); + listener.onResponse(TIFJobParameter.PARSER.parse(parser, null)); + }, e -> { + log.error("Failed to fetch tif job document " + name, e); + listener.onFailure(e); + }))); } /** * Put tifJobParameter in an index {@code TIFJobExtension.JOB_INDEX_NAME} * * @param tifJobParameter the tifJobParameter - * @param listener the listener + * @param listener the listener */ - public void saveTIFJobParameter(final TIFJobParameter tifJobParameter, final ActionListener listener) { + public void saveTIFJobParameter(final TIFJobParameter tifJobParameter, final ActionListener listener) { tifJobParameter.setLastUpdateTime(Instant.now()); StashedThreadContext.run(client, () -> { try { diff --git a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java index 13db6235d..1d8d8643f 100644 --- a/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java +++ b/src/main/java/org/opensearch/securityanalytics/threatIntel/jobscheduler/TIFJobRunner.java @@ -109,72 +109,82 @@ public void runJob(final ScheduledJobParameter jobParameter, final JobExecutionC * @param jobParameter job parameter */ protected Runnable updateJobRunner(final ScheduledJobParameter jobParameter) { - return () -> { - Optional lockModel = lockService.acquireLock( - jobParameter.getName(), - TIFLockService.LOCK_DURATION_IN_SECONDS - ); - if (lockModel.isEmpty()) { - log.error("Failed to update. Another processor is holding a lock for job parameter[{}]", jobParameter.getName()); - return; - } - - LockModel lock = lockModel.get(); - try { - updateJobParameter(jobParameter, lockService.getRenewLockRunnable(new AtomicReference<>(lock))); - } catch (Exception e) { - log.error("Failed to update job parameter[{}]", jobParameter.getName(), e); - } finally { - lockService.releaseLock(lock); - } - }; + return () -> lockService.acquireLock( + jobParameter.getName(), + TIFLockService.LOCK_DURATION_IN_SECONDS, + ActionListener.wrap(lock -> { + updateJobParameter(jobParameter, lockService.getRenewLockRunnable(new AtomicReference<>(lock)), + ActionListener.wrap( + r -> lockService.releaseLock(lock), + e -> { + log.error("Failed to update job parameter " + jobParameter.getName(), e); + lockService.releaseLock(lock); + } + )); + }, e -> { + log.error("Failed to update. Another processor is holding a lock for job parameter[{}]", jobParameter.getName()); + }) + ); } - protected void updateJobParameter(final ScheduledJobParameter jobParameter, final Runnable renewLock) throws IOException { - TIFJobParameter jobSchedulerParameter = jobSchedulerParameterService.getJobParameter(jobParameter.getName()); - /** - * If delete request comes while update task is waiting on a queue for other update tasks to complete, - * because update task for this jobSchedulerParameter didn't acquire a lock yet, delete request is processed. - * When it is this jobSchedulerParameter's turn to run, it will find that the jobSchedulerParameter is deleted already. - * Therefore, we stop the update process when data source does not exist. - */ - if (jobSchedulerParameter == null) { - log.info("Job parameter[{}] does not exist", jobParameter.getName()); - return; - } + protected void updateJobParameter(final ScheduledJobParameter jobParameter, final Runnable renewLock, ActionListener listener) { + jobSchedulerParameterService.getJobParameter(jobParameter.getName(), ActionListener.wrap( + jobSchedulerParameter -> { + /** + * If delete request comes while update task is waiting on a queue for other update tasks to complete, + * because update task for this jobSchedulerParameter didn't acquire a lock yet, delete request is processed. + * When it is this jobSchedulerParameter's turn to run, it will find that the jobSchedulerParameter is deleted already. + * Therefore, we stop the update process when data source does not exist. + */ + if (jobSchedulerParameter == null) { + log.info("Job parameter[{}] does not exist", jobParameter.getName()); + return; + } - if (TIFJobState.AVAILABLE.equals(jobSchedulerParameter.getState()) == false) { - log.error("Invalid jobSchedulerParameter state. Expecting {} but received {}", TIFJobState.AVAILABLE, jobSchedulerParameter.getState()); - jobSchedulerParameter.disable(); - jobSchedulerParameter.getUpdateStats().setLastFailedAt(Instant.now()); - jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter, null); - return; - } - // create new TIF data and delete old ones - List oldIndices = new ArrayList<>(jobSchedulerParameter.getIndices()); - jobSchedulerUpdateService.createThreatIntelFeedData(jobSchedulerParameter, renewLock, new ActionListener<>() { - @Override - public void onResponse(ThreatIntelIndicesResponse response) { - if (response.isAcknowledged()) { - List newFeedIndices = response.getIndices(); - jobSchedulerUpdateService.deleteAllTifdIndices(oldIndices, newFeedIndices); - if (false == newFeedIndices.isEmpty()) { - detectorThreatIntelService.updateDetectorsWithLatestThreatIntelRules(); + if (TIFJobState.AVAILABLE.equals(jobSchedulerParameter.getState()) == false) { + log.error("Invalid jobSchedulerParameter state. Expecting {} but received {}", TIFJobState.AVAILABLE, jobSchedulerParameter.getState()); + jobSchedulerParameter.disable(); + jobSchedulerParameter.getUpdateStats().setLastFailedAt(Instant.now()); + jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter, ActionListener.wrap( + r-> {}, e -> log.error("Failed to update job scheduler parameter in Threat intel feed update job") + )); } - } else { - log.error("Failed to update jobSchedulerParameter for {}", jobSchedulerParameter.getName()); - jobSchedulerParameter.getUpdateStats().setLastFailedAt(Instant.now()); - jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter, null); - } - } - @Override - public void onFailure(Exception e) { - log.error("Failed to update jobSchedulerParameter for {}", jobSchedulerParameter.getName(), e); - jobSchedulerParameter.getUpdateStats().setLastFailedAt(Instant.now()); - jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter, null); - } - }); + // create new TIF data and delete old ones + List oldIndices = new ArrayList<>(jobSchedulerParameter.getIndices()); + jobSchedulerUpdateService.createThreatIntelFeedData(jobSchedulerParameter, renewLock, new ActionListener<>() { + @Override + public void onResponse(ThreatIntelIndicesResponse response) { + if (response.isAcknowledged()) { + List newFeedIndices = response.getIndices(); + jobSchedulerUpdateService.deleteAllTifdIndices(oldIndices, newFeedIndices); + if (false == newFeedIndices.isEmpty()) { + detectorThreatIntelService.updateDetectorsWithLatestThreatIntelRules(); + } + } else { + log.error("Failed to update jobSchedulerParameter for {}", jobSchedulerParameter.getName()); + jobSchedulerParameter.getUpdateStats().setLastFailedAt(Instant.now()); + jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter, ActionListener.wrap( + r-> {}, e -> log.error("Failed to update job scheduler parameter in Threat intel feed update job") + )); + } + } + + @Override + public void onFailure(Exception e) { + log.error("Failed to update jobSchedulerParameter for {}", jobSchedulerParameter.getName(), e); + jobSchedulerParameter.getUpdateStats().setLastFailedAt(Instant.now()); + jobSchedulerParameterService.updateJobSchedulerParameter(jobSchedulerParameter, ActionListener.wrap( + r-> {}, ex -> log.error("Failed to update job scheduler parameter in Threat intel feed update job") + )); + } + }); + listener.onResponse(null); + }, + e -> { + listener.onFailure(e); + } + )); } } \ No newline at end of file diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java index e6dea9947..883bf8ee7 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportIndexDetectorAction.java @@ -717,7 +717,6 @@ private IndexMonitorRequest createDocLevelMonitorRequest(List private void addThreatIntelBasedDocLevelQueries(Detector detector, ActionListener> listener) { try { - if (detector.getThreatIntelEnabled()) { log.debug("threat intel enabled for detector {} . adding threat intel based doc level queries.", detector.getName()); List iocFieldsList = logTypeService.getIocFieldsList(detector.getDetectorType()); @@ -730,8 +729,7 @@ private void addThreatIntelBasedDocLevelQueries(Detector detector, ActionListene listener.onResponse(List.of()); } } catch (Exception e) { - // not failing detector creation if any fatal exception occurs during doc level query creation from threat intel feed data - log.error("Failed to convert threat intel feed to doc level query. Proceeding with detector creation", e); + log.error("Failed to add threat intel based doc level queries"); listener.onFailure(e); } } diff --git a/src/main/java/org/opensearch/securityanalytics/transport/TransportSearchDetectorAction.java b/src/main/java/org/opensearch/securityanalytics/transport/TransportSearchDetectorAction.java index 0643b34d7..3b7b36503 100644 --- a/src/main/java/org/opensearch/securityanalytics/transport/TransportSearchDetectorAction.java +++ b/src/main/java/org/opensearch/securityanalytics/transport/TransportSearchDetectorAction.java @@ -6,30 +6,25 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; - -import org.opensearch.core.action.ActionListener; import org.opensearch.action.search.SearchResponse; - import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; -import org.opensearch.commons.authuser.User; import org.opensearch.client.Client; +import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.settings.Settings; -import org.opensearch.cluster.service.ClusterService; +import org.opensearch.commons.authuser.User; +import org.opensearch.core.action.ActionListener; import org.opensearch.core.xcontent.NamedXContentRegistry; import org.opensearch.securityanalytics.action.SearchDetectorAction; import org.opensearch.securityanalytics.action.SearchDetectorRequest; import org.opensearch.securityanalytics.settings.SecurityAnalyticsSettings; import org.opensearch.securityanalytics.threatIntel.action.TransportPutTIFJobAction; import org.opensearch.securityanalytics.util.DetectorIndices; -import org.opensearch.threadpool.ThreadPool; - import org.opensearch.tasks.Task; +import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.TransportService; -import java.util.concurrent.CountDownLatch; - import static org.opensearch.securityanalytics.util.DetectorUtils.getEmptySearchResponse; public class TransportSearchDetectorAction extends HandledTransportAction implements SecureTransportAction { diff --git a/src/test/java/org/opensearch/securityanalytics/SecurityAnalyticsPluginTransportIT.java b/src/test/java/org/opensearch/securityanalytics/SecurityAnalyticsPluginTransportIT.java deleted file mode 100644 index 688df56a0..000000000 --- a/src/test/java/org/opensearch/securityanalytics/SecurityAnalyticsPluginTransportIT.java +++ /dev/null @@ -1,33 +0,0 @@ -/* - * Copyright OpenSearch Contributors - * SPDX-License-Identifier: Apache-2.0 - */ -package org.opensearch.securityanalytics; - -import org.junit.Assert; -import org.opensearch.action.admin.cluster.node.info.NodeInfo; -import org.opensearch.action.admin.cluster.node.info.NodesInfoRequest; -import org.opensearch.action.admin.cluster.node.info.NodesInfoResponse; -import org.opensearch.action.admin.cluster.node.info.PluginsAndModules; -import org.opensearch.plugins.PluginInfo; -import org.opensearch.test.OpenSearchIntegTestCase; - -import java.util.List; -import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; - -/*public class SecurityAnalyticsPluginTransportIT extends OpenSearchIntegTestCase { - - public void testPluginsAreInstalled() { - NodesInfoRequest nodesInfoRequest = new NodesInfoRequest(); - nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName()); - NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest) - .actionGet(); - List pluginInfos = nodesInfoResponse.getNodes().stream() - .flatMap((Function>) nodeInfo -> nodeInfo.getInfo(PluginsAndModules.class) - .getPluginInfos().stream()).collect(Collectors.toList()); - Assert.assertTrue(pluginInfos.stream().anyMatch(pluginInfo -> pluginInfo.getName() - .equals("opensearch-security-analytics"))); - } -}*/ \ No newline at end of file diff --git a/src/test/java/org/opensearch/securityanalytics/threatIntel/common/ThreatIntelLockServiceTests.java b/src/test/java/org/opensearch/securityanalytics/threatIntel/common/ThreatIntelLockServiceTests.java index 4b6423a3e..7a95e746f 100644 --- a/src/test/java/org/opensearch/securityanalytics/threatIntel/common/ThreatIntelLockServiceTests.java +++ b/src/test/java/org/opensearch/securityanalytics/threatIntel/common/ThreatIntelLockServiceTests.java @@ -41,9 +41,12 @@ public void testAcquireLock_whenValidInput_thenSucceed() { public void testAcquireLock_whenCalled_thenNotBlocked() { long expectedDurationInMillis = 1000; Instant before = Instant.now(); - assertTrue(threatIntelLockService.acquireLock(null, null).isEmpty()); - Instant after = Instant.now(); - assertTrue(after.toEpochMilli() - before.toEpochMilli() < expectedDurationInMillis); + threatIntelLockService.acquireLock(null, null, ActionListener.wrap( + r -> fail("Should not have been blocked"), e -> { + Instant after = Instant.now(); + assertTrue(after.toEpochMilli() - before.toEpochMilli() < expectedDurationInMillis); + } + )); } public void testReleaseLock_whenValidInput_thenSucceed() {