-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
add joins, insertion & search to correlation engine #7771
Conversation
Gradle Check (Jenkins) Run Completed with:
|
This PR is stalled because it has been open for 30 days with no activity. Remove stalled label or comment or this will be closed in 7 days. |
Gradle Check (Jenkins) Run Completed with:
|
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
❌ Gradle check result for 4b4a92f: FAILURE Please examine the workflow log, locate, and copy-paste the failure(s) below, then iterate to green. Is the failure a flaky test unrelated to your change? |
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
…ject#9956) Fix is pending in opensearch-project#7771, but that PR may take some time to land in main so muting for the time being. Signed-off-by: Andrew Ross <andrross@amazon.com> Signed-off-by: Shivansh Arora <hishiv@amazon.com>
/** | ||
* Rest action for indexing an event and its correlations | ||
*/ | ||
public class RestIndexCorrelationAction extends BaseRestHandler { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do these actions support timeouts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added search timeouts to these actions.
client.search(searchRequest, new ActionListener<>() { | ||
@Override | ||
public void onResponse(SearchResponse response) { | ||
if (response.isTimedOut()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it can be 429s as well
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added check for 429
int idx = 0; | ||
for (MultiSearchResponse.Item response : responses) { | ||
if (response.isFailure()) { | ||
log.error("error:", response.getFailure()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add more details to log message.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
fixed this.
* | ||
* @opensearch.internal | ||
*/ | ||
public class TransportIndexCorrelationAction extends HandledTransportAction<IndexCorrelationRequest, IndexCorrelationResponse> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should move the core logic to EventsCorrelationManager or something of that sort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Applicable for all transport classes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can address this in a follow-up pr.
if (response.getStatus().equals(RestStatus.OK)) { | ||
onOperation(true, new HashMap<>()); | ||
} else { | ||
onFailures(new OpenSearchStatusException("Failed to store correlations", RestStatus.INTERNAL_SERVER_ERROR)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you need specific error handling here when it failed to index vectors? lets say why would it fail with 4xx (besides 429), Also for intermittent error in case it did index but the response failed, is that an issue?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
currently, RestStatus.OK -> success
, any other status -> failures
. This is returned by a custom transport action.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
i can enhance error handling in a follow-up pr.
List<CorrelationQuery> correlationQueries = indexQueriesEntry.getValue(); | ||
|
||
// assuming all queries belonging to an index use the same timestamp field. | ||
String timestampField = correlationQueries.get(0).getTimestampField(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you want to validate here the assumption for timestamp field
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added validation.
continue; | ||
} | ||
|
||
Iterator<SearchHit> searchHits = response.getResponse().getHits().iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this may not be all results right as you are not setting size param
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes. pagination is missing. can address this in follow-up prs.
searchRequest.indices(index); | ||
searchRequest.source(searchSourceBuilder); | ||
|
||
client.search(searchRequest, new ActionListener<>() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
multiple search requests are chained as PiT is not being used, can these results become inconsistent as they will be executed against different segments?
onFailures(new OpenSearchStatusException(response.toString(), RestStatus.REQUEST_TIMEOUT)); | ||
} | ||
|
||
Iterator<SearchHit> hits = response.getHits().iterator(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there any limits on the no. of hits?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
10000
is default limit.
}); | ||
} | ||
|
||
private void prepRulesForCorrelatedEventsGeneration(String index, String event, List<CorrelationRule> correlationRules) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
there are very long method, can break into multiple small private methods
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can address this refactoring in a follow-up pr.
@Override | ||
protected void doExecute(Task task, IndexCorrelationRequest request, ActionListener<IndexCorrelationResponse> listener) { | ||
AsyncIndexCorrelationAction asyncAction = new AsyncIndexCorrelationAction(request, listener); | ||
asyncAction.start(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These are heavy weight transport actions chaining multiple queries as well. In which threadpool they would run. They shouldn't run on transport threadpool.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
today, all of them run in transport threadpool.
* | ||
* @opensearch.internal | ||
*/ | ||
public class TransportSearchCorrelatedEventsAction extends HandledTransportAction< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are there any stats planned for CorrelatedEvents Actions?
Signed-off-by: Subhobrata Dey <sbcd90@gmail.com>
❕ Gradle check result for 46dafcb: UNSTABLE Please review all flaky tests that succeeded after retry and create an issue if one does not already exist to track the flaky failure. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do think a correlation engine is useful and believe that the OpenSearch Project should deliver this feature. However, I remain unconvinced that the OpenSearch repository maintainers are the right people to take ownership of this code base. I personally do not have the bandwidth to build up the domain expertise required to properly review this code, and the fact that this PR has languished for over a year suggests that the case is similar for other maintainers. Since this feature is being architected as a plugin, I don't think there is any technical reason this code needs to live inside the OpenSearch repository. I also believe there are significant advantages to being able to iterate and evolve at its own pace, without being coupled to all the bottlenecks and pain points (flaky tests) that exist in the OpenSearch repository. Also note that a convention has been recently codified that plugins should be developed in their own repository as opposed to inside the OpenSearch repository. If another maintainer wants to make the case that this is the right place for this feature, then I will happily withdraw my objection.
I agree with @andrross, should we close this? |
@dblock, @andrross , @shwetathareja , @reta I understand maintainers are not permanent and there will be new maintainers and ideas, however as an open source project, what is the guiding principle for these kind of changes. The convention says discuss it during RFC stage and we duly followed the process. This would be a great precedent for a non-maintainer to contribute and discuss the feature and benefits and qualify to become a core maintainer or we will always be restricting contributions and not grow the project. |
@praveensameneni the OpenSearch ecosystem evolves fast and along the way the things that do work or don't become more apparent (plus new challenges always come in). The recently published guidelines (which we did not have a year ago) are trying to touch upon where new plugins belong. I agree with @andrross point and @dblock that it should be out of core at this moment of time.
The contributor could become the maintainer on the plugin repo (and continue contribution to core to become a maintainer there as well), we have quite a few successful stories like that already. |
@reta While the guidelines were recently added, the discussion from last year when we created the RFC still holds true. I understand the guidelines around new plugins, which unfortunately is biased towards maintainers domain expertise and does not fully account for integrating dependencies in plugin ecosystem. Additionally, It would also be better to take into account the use case and work backwards from the problem it's trying to solve and the value add for the community. |
@praveensameneni I just want to reiterate that the need to take a dependency on a new plugin is true regardless of where this code lives. The OpenSearch Project publishes many plugins, some of which reside in this repo, some of which reside in other repos, and consumers need not know or care.
What has become apparent to me in this case is that the decision for the OpenSearch maintainers to take ownership of this code has not worked, as shown by this PR being open for over a year. The current state of the correlation engine is that some basic functionality has been committed in this repo on the main branch (I don't think enough is there to be practically useful) and none of it has been backported to 2.x so it has never been released. |
If it's a separate plugin repo outside of core repo, you take a dependency through common-utils, the plugin repo jar among others, however when shipped as a plugin (e.g. ActionPlugin, ScriptPlugin, ReloadablePlugin, ExtensiblePlugin) inside of core repo, we do not have to take an additional jar, common-utils dependency. Just to clarify, we will continue to support and maintain the code. |
@praveensameneni the plugins are distributed as ZIP archives (bundles), not a JAR files.The OpenSearch distribution comes with some plugins preinstalled / prebundled. What is the plugin repo jar you are referring to? |
Thank you @reta , @andrross for your insightful comments. |
We will take the PR and previously merged PR's into the new opensearch-correlations plugin repository. |
For posterity, I want to clarify a few points here. There are a couple different types of dependencies between plugins as far as I know.
I'm honestly not sure what category the correlation engine will fall into (thus far I haven't seen it defining an SPI interface so I would guess category 1), but the overall point stands that to a consumer plugin where the dependent plugin code lives should not be important. In both the k-NN and job-scheduler cases they are plugins defined in external repos. I'm not actually familiar with common-utils but I believe it is really just a common library for sharing code across multiple plugins. |
I'm going to close this PR as it looks like we have a path forward as a separate repository. As the feature matures and the code evolves, if the plugin model is no longer the right fit we can absolutely re-evaluate integrating the feature directly into the core, but for now I believe a separate repository is the right place. |
Description
This PR focusses on the
Join
handler,Insertion
handler & theSearch
handler components of the Events Correlation Engine. The component level architecture diagram can be found hereJoin Handler
The Join handler component determines immediate neighbors of a particular event using the Correlation Rules defined by the user for the log indices(or datastreams) they wish to correlate.
Here is a brief schematic class diagram of the Join Handler layer.
Insertion Handler
In the Insertion handler layer events are converted to k-dimensional vectors & are stored in the vector storage layer mentioned above along with their correlations.
Here is a brief schematic class diagram of the Insertion Handler layer.
Search Handler
The Search Handler layer allows user to specify a particular event, & then converts it to a k-dimensional vector & then uses it to query its neighboring eventswhich are actually its correlated events within a time window.
Here is a brief schematic class diagram of the Search Handler layer.
REST apis
The following are the REST apis introduced by this PR.
Correlate an event
Search Correlated Events for the input event
Related Issues
#6854
Check List
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license.
For more information on following Developer Certificate of Origin and signing off your commits, please check here.