-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
x-pack/filebeat/input/httpjson: remove concurrency from response processing #36493
Conversation
…h action and control flow
…lisher satisfy sendStream
…ocessor.startProcessing
fe1454f
to
7606d19
Compare
7606d19
to
334b4ee
Compare
Pinging @elastic/security-external-integrations (Team:Security-External Integrations) |
This change fixes the flakey test in #34929. Applying the same (similar; actually sensibly readable this time) grain of sand as in the analysis and unskipping the test now passes, as expected since there can be no race now. diff --git a/x-pack/filebeat/input/httpjson/input_test.go b/x-pack/filebeat/input/httpjson/input_test.go
index e88a0a28d3..91ba8a1125 100644
--- a/x-pack/filebeat/input/httpjson/input_test.go
+++ b/x-pack/filebeat/input/httpjson/input_test.go
@@ -374,7 +374,7 @@ var testCases = []struct {
},
},
{
- skipReason: "flakey test - see https://github.com/elastic/beats/issues/34929",
+ // skipReason: "flakey test - see https://github.com/elastic/beats/issues/34929",
name: "first_event",
setupServer: func(t testing.TB, h http.HandlerFunc, config map[string]interface{}) {
diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go
index 6a1d926ab4..3ed904f16d 100644
--- a/x-pack/filebeat/input/httpjson/request.go
+++ b/x-pack/filebeat/input/httpjson/request.go
@@ -16,6 +16,7 @@ import (
"reflect"
"strconv"
"strings"
+ "time"
"github.com/PaesslerAG/jsonpath"
@@ -741,6 +742,10 @@ func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bo
}
func (p *publisher) event(_ context.Context, msg mapstr.M) {
+ if p.n == 0 {
+ time.Sleep(10 * time.Millisecond)
+ }
+
if p.pub != nil {
event, err := makeEvent(msg)
if err != nil {
For completeness, the same change as here applied to main. diff --git a/x-pack/filebeat/input/httpjson/input_test.go b/x-pack/filebeat/input/httpjson/input_test.go
index e88a0a28d3..91ba8a1125 100644
--- a/x-pack/filebeat/input/httpjson/input_test.go
+++ b/x-pack/filebeat/input/httpjson/input_test.go
@@ -374,7 +374,7 @@ var testCases = []struct {
},
},
{
- skipReason: "flakey test - see https://github.com/elastic/beats/issues/34929",
+ // skipReason: "flakey test - see https://github.com/elastic/beats/issues/34929",
name: "first_event",
setupServer: func(t testing.TB, h http.HandlerFunc, config map[string]interface{}) {
diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go
index f92d2944c7..2f93d5d391 100644
--- a/x-pack/filebeat/input/httpjson/request.go
+++ b/x-pack/filebeat/input/httpjson/request.go
@@ -16,6 +16,7 @@ import (
"reflect"
"strconv"
"strings"
+ "time"
"github.com/PaesslerAG/jsonpath"
@@ -706,6 +707,10 @@ func processAndPublishEvents(trCtx *transformContext, events <-chan maybeMsg, pu
continue
}
+ if n == 0 {
+ time.Sleep(10 * time.Millisecond)
+ }
+
if publish {
event, err := makeEvent(maybeMsg.msg)
if err != nil {
Reactivating this test will be in its own PR. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Awesome to see the progress here!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, Awesome work so far. The refactor is looking great.
One question from my end. Right now during chain pagination events, we skip the first event (same as the legacy approach) because we have already processed it. Can we handle this more elegantly in future, for example using the response of the initial chain request (if pagination exists) to directly start from request/page 2 instead to performing the initial request again ?
@ShourieG That would be an issue to file, but it's out of scope for the work here. |
…essing (elastic#36493) Replace concurrent code and locks with sequential processing using handler interface types to simplify data flow. Rather that sending events/errors on a channel to process in a separate gorouting, have a handler that has a couple of methods to handle either events or error non-concurrently with the event construction. The process to refactor was to progressively convert the channel to implement the proposed handler interface, split the chan range function into the two handler methods and then remove the range instead calling the appropriate method within the generator. After this was done, the locks required to protect concurrent access of transformers were removed. goos: linux goarch: amd64 pkg: github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson cpu: Intel(R) Core(TM) i7-6700HQ CPU @ 2.60GHz │ old.bench │ new.bench │ │ sec/op │ sec/op vs base │ Input/simple_GET_request-8 354.0µ ± 1% 335.5µ ± 1% -5.21% (p=0.000 n=20) Input/simple_HTTPS_GET_request-8 3.085m ± 5% 3.015m ± 5% ~ (p=0.904 n=20) Input/request_honors_rate_limit-8 332.4µ ± 0% 325.9µ ± 1% -1.95% (p=0.001 n=20) Input/request_retries_when_failed-8 345.2µ ± 1% 337.4µ ± 1% -2.25% (p=0.002 n=20) Input/POST_request_with_body-8 356.4µ ± 1% 338.4µ ± 1% -5.06% (p=0.000 n=20) Input/repeated_POST_requests-8 103.5m ± 0% 103.4m ± 0% -0.16% (p=0.024 n=20) Input/split_by_json_objects_array-8 361.4µ ± 1% 340.8µ ± 2% -5.70% (p=0.000 n=19) Input/split_by_json_objects_array_with_keep_parent-8 361.7µ ± 1% 339.4µ ± 1% -6.17% (p=0.000 n=20) Input/split_on_empty_array_without_ignore_empty_value-8 339.1µ ± 1% 330.1µ ± 1% -2.63% (p=0.000 n=20) Input/split_on_empty_array_with_ignore_empty_value-8 5.000 ± 0% 5.001 ± 0% ~ (p=0.301 n=20) Input/split_on_null_field_with_ignore_empty_value_keeping_parent-8 337.0µ ± 1% 325.0µ ± 1% -3.58% (p=0.000 n=20) Input/split_on_empty_array_with_ignore_empty_value_keeping_parent-8 337.2µ ± 0% 323.8µ ± 1% -3.97% (p=0.000 n=20) Input/split_on_null_field_at_root_with_ignore_empty_value_keeping_parent-8 335.6µ ± 1% 323.0µ ± 0% -3.74% (p=0.000 n=20) Input/split_on_empty_array_at_root_with_ignore_empty_value_keeping_parent-8 337.4µ ± 1% 325.8µ ± 4% -3.44% (p=0.001 n=14+17) Input/nested_split-8 359.1µ ± 1% 334.6µ ± 0% -6.84% (p=0.000 n=20) Input/pagination-8 10.40m ± 2% 10.36m ± 1% ~ (p=0.698 n=20) Input/first_event-8 2.006 ± 0% 2.005 ± 0% -0.06% (p=0.017 n=20) Input/oauth2-8 579.0µ ± 1% 557.7µ ± 1% -3.68% (p=0.000 n=20) Input/request_transforms_can_access_state_from_previous_transforms-8 621.9µ ± 2% 590.3µ ± 1% -5.09% (p=0.000 n=20) Input/response_transforms_can't_access_request_state_from_previous_transforms-8 540.5µ ± 1% 512.1µ ± 0% -5.25% (p=0.000 n=20) Input/simple_Chain_GET_request-8 627.1µ ± 1% 585.1µ ± 1% -6.69% (p=0.000 n=20) Input/multiple_Chain_GET_request-8 907.1µ ± 2% 862.1µ ± 5% -4.96% (p=0.005 n=17) Input/date_cursor_while_using_chain-8 871.3µ ± 1% 823.0µ ± 0% -5.54% (p=0.000 n=20) Input/split_by_json_objects_array_in_chain-8 638.4µ ± 3% 592.8µ ± 2% -7.13% (p=0.000 n=20) Input/split_by_json_objects_array_with_keep_parent_in_chain-8 640.4µ ± 0% 593.6µ ± 1% -7.32% (p=0.000 n=20+18) Input/pagination_when_used_with_chaining-8 1.318m ± 3% 1.234m ± 3% -6.39% (p=0.001 n=20) Input/replace_with_clause_and_first_response_object-8 1.101m ± 1% 1.066m ± 4% ~ (p=0.054 n=20+19) Input/replace_with_clause_with_hardcoded_value_1-8 838.4µ ± 2% 805.3µ ± 0% -3.95% (p=0.000 n=19+20) Input/replace_with_clause_with_hardcoded_value_(no_dot_prefix)-8 842.3µ ± 1% 827.9µ ± 3% -1.71% (p=0.040 n=20) Input/replace_with_clause_with_hardcoded_value_(more_than_one_dot_prefix)-8 849.4µ ± 2% 859.1µ ± 3% ~ (p=0.923 n=20+19) Input/replace_with_clause_with_hardcoded_value_containing_'.'_(dots)-8 844.6µ ± 4% 831.2µ ± 10% ~ (p=0.055 n=18+20) Input/global_transform_context_separation_with_parent_last_response_object-8 1.940m ± 2% 1.860m ± 1% -4.12% (p=0.000 n=20) Input/cursor_value_is_updated_for_root_response_with_chaining_&_pagination-8 2.174m ± 2% 2.119m ± 1% -2.52% (p=0.005 n=20+19) Input/cursor_value_is_updated_for_root_response_with_chaining_&_pagination_along_with_split_operator-8 2.208m ± 1% 2.140m ± 0% -3.08% (p=0.000 n=19+20) Input/Test_simple_XML_decode-8 9.797m ± 11% 9.636m ± 1% -1.64% (p=0.012 n=18+20) Input/split_events_by_not_found-8 5.001 ± 0% geomean 1.428m 1.728m -3.63% │ old.bench │ new.bench │ │ B/op │ B/op vs base │ Input/simple_GET_request-8 99.76Ki ± 0% 99.16Ki ± 0% -0.61% (p=0.000 n=20) Input/simple_HTTPS_GET_request-8 184.1Ki ± 0% 183.3Ki ± 0% -0.45% (p=0.000 n=20) Input/request_honors_rate_limit-8 98.14Ki ± 0% 97.58Ki ± 0% -0.58% (p=0.000 n=20) Input/request_retries_when_failed-8 98.97Ki ± 0% 98.37Ki ± 0% -0.61% (p=0.000 n=20) Input/POST_request_with_body-8 100.7Ki ± 0% 100.1Ki ± 0% -0.60% (p=0.000 n=20) Input/repeated_POST_requests-8 136.1Ki ± 1% 135.2Ki ± 1% -0.70% (p=0.002 n=20) Input/split_by_json_objects_array-8 102.9Ki ± 0% 101.8Ki ± 0% -0.99% (p=0.000 n=19) Input/split_by_json_objects_array_with_keep_parent-8 103.6Ki ± 0% 102.6Ki ± 0% -1.01% (p=0.000 n=20) Input/split_on_empty_array_without_ignore_empty_value-8 99.27Ki ± 0% 98.68Ki ± 0% -0.60% (p=0.000 n=20) Input/split_on_empty_array_with_ignore_empty_value-8 221.2Ki ± 4% 227.4Ki ± 2% ~ (p=0.355 n=20) Input/split_on_null_field_with_ignore_empty_value_keeping_parent-8 99.29Ki ± 0% 98.71Ki ± 0% -0.59% (p=0.000 n=20) Input/split_on_empty_array_with_ignore_empty_value_keeping_parent-8 99.36Ki ± 0% 98.76Ki ± 0% -0.61% (p=0.000 n=20) Input/split_on_null_field_at_root_with_ignore_empty_value_keeping_parent-8 98.76Ki ± 0% 98.17Ki ± 0% -0.60% (p=0.000 n=20) Input/split_on_empty_array_at_root_with_ignore_empty_value_keeping_parent-8 98.82Ki ± 0% 98.22Ki ± 0% -0.61% (p=0.000 n=14+17) Input/nested_split-8 103.6Ki ± 0% 102.5Ki ± 0% -0.99% (p=0.000 n=20) Input/pagination-8 429.2Ki ± 0% 425.7Ki ± 0% -0.80% (p=0.000 n=20) Input/first_event-8 315.2Ki ± 1% 322.6Ki ± 4% ~ (p=0.512 n=20) Input/oauth2-8 124.4Ki ± 0% 123.8Ki ± 0% -0.53% (p=0.000 n=20) Input/request_transforms_can_access_state_from_previous_transforms-8 160.2Ki ± 0% 159.6Ki ± 0% -0.39% (p=0.000 n=20) Input/response_transforms_can't_access_request_state_from_previous_transforms-8 140.3Ki ± 0% 139.7Ki ± 0% -0.46% (p=0.000 n=20) Input/simple_Chain_GET_request-8 159.5Ki ± 0% 158.2Ki ± 0% -0.82% (p=0.000 n=20) Input/multiple_Chain_GET_request-8 214.8Ki ± 0% 215.2Ki ± 0% +0.18% (p=0.013 n=17) Input/date_cursor_while_using_chain-8 209.7Ki ± 0% 208.3Ki ± 0% -0.66% (p=0.000 n=20) Input/split_by_json_objects_array_in_chain-8 162.7Ki ± 0% 161.0Ki ± 0% -1.07% (p=0.000 n=20) Input/split_by_json_objects_array_with_keep_parent_in_chain-8 163.5Ki ± 0% 161.8Ki ± 0% -1.08% (p=0.000 n=20+18) Input/pagination_when_used_with_chaining-8 247.8Ki ± 0% 245.4Ki ± 0% -0.95% (p=0.000 n=20) Input/replace_with_clause_and_first_response_object-8 244.1Ki ± 0% 242.1Ki ± 0% -0.79% (p=0.000 n=20+19) Input/replace_with_clause_with_hardcoded_value_1-8 186.3Ki ± 0% 184.6Ki ± 0% -0.92% (p=0.000 n=19+20) Input/replace_with_clause_with_hardcoded_value_(no_dot_prefix)-8 188.8Ki ± 0% 187.2Ki ± 0% -0.87% (p=0.000 n=20) Input/replace_with_clause_with_hardcoded_value_(more_than_one_dot_prefix)-8 189.0Ki ± 0% 187.3Ki ± 0% -0.93% (p=0.000 n=20+19) Input/replace_with_clause_with_hardcoded_value_containing_'.'_(dots)-8 186.6Ki ± 0% 184.9Ki ± 0% -0.91% (p=0.000 n=18+20) Input/global_transform_context_separation_with_parent_last_response_object-8 357.2Ki ± 0% 354.4Ki ± 0% -0.79% (p=0.000 n=20) Input/cursor_value_is_updated_for_root_response_with_chaining_&_pagination-8 442.3Ki ± 0% 439.1Ki ± 0% -0.72% (p=0.000 n=20+19) Input/cursor_value_is_updated_for_root_response_with_chaining_&_pagination_along_with_split_operator-8 452.7Ki ± 0% 449.2Ki ± 0% -0.78% (p=0.000 n=19+20) Input/Test_simple_XML_decode-8 2.645Mi ± 0% 2.644Mi ± 0% -0.03% (p=0.000 n=18+20) Input/split_events_by_not_found-8 230.9Ki ± 2% geomean 178.2Ki 178.6Ki -0.51% │ old.bench │ new.bench │ │ allocs/op │ allocs/op vs base │ Input/simple_GET_request-8 631.0 ± 0% 624.0 ± 0% -1.11% (p=0.000 n=20) Input/simple_HTTPS_GET_request-8 1.715k ± 0% 1.708k ± 0% -0.41% (p=0.000 n=20) Input/request_honors_rate_limit-8 601.0 ± 0% 594.0 ± 0% -1.16% (p=0.000 n=20) Input/request_retries_when_failed-8 602.0 ± 0% 595.0 ± 0% -1.16% (p=0.000 n=20) Input/POST_request_with_body-8 640.0 ± 0% 633.0 ± 0% -1.09% (p=0.000 n=20) Input/repeated_POST_requests-8 919.5 ± 0% 905.0 ± 0% -1.58% (p=0.000 n=20) Input/split_by_json_objects_array-8 656.0 ± 0% 643.0 ± 0% -1.98% (p=0.000 n=19) Input/split_by_json_objects_array_with_keep_parent-8 666.0 ± 0% 653.0 ± 0% -1.95% (p=0.000 n=20) Input/split_on_empty_array_without_ignore_empty_value-8 612.0 ± 0% 605.0 ± 0% -1.14% (p=0.000 n=20) Input/split_on_empty_array_with_ignore_empty_value-8 1.516k ± 1% 1.508k ± 0% ~ (p=0.062 n=20) Input/split_on_null_field_with_ignore_empty_value_keeping_parent-8 610.0 ± 0% 603.0 ± 0% -1.15% (p=0.000 n=20) Input/split_on_empty_array_with_ignore_empty_value_keeping_parent-8 612.0 ± 0% 605.0 ± 0% -1.14% (p=0.000 n=20) Input/split_on_null_field_at_root_with_ignore_empty_value_keeping_parent-8 607.0 ± 0% 600.0 ± 0% -1.15% (p=0.000 n=20) Input/split_on_empty_array_at_root_with_ignore_empty_value_keeping_parent-8 609.0 ± 0% 602.0 ± 0% -1.15% (n=14+17) Input/nested_split-8 662.0 ± 0% 649.0 ± 0% -1.96% (p=0.000 n=20) Input/pagination-8 3.574k ± 0% 3.507k ± 0% -1.85% (p=0.000 n=20) Input/first_event-8 2.325k ± 0% 2.300k ± 0% -1.08% (p=0.000 n=20) Input/oauth2-8 862.0 ± 0% 855.0 ± 0% -0.81% (p=0.000 n=20) Input/request_transforms_can_access_state_from_previous_transforms-8 1.072k ± 0% 1.065k ± 0% -0.65% (p=0.000 n=20) Input/response_transforms_can't_access_request_state_from_previous_transforms-8 918.0 ± 0% 911.0 ± 0% -0.76% (p=0.000 n=20) Input/simple_Chain_GET_request-8 1.022k ± 0% 1.006k ± 0% -1.52% (p=0.000 n=20) Input/multiple_Chain_GET_request-8 1.344k ± 0% 1.332k ± 0% -0.89% (p=0.000 n=17) Input/date_cursor_while_using_chain-8 1.399k ± 0% 1.384k ± 0% -1.07% (p=0.000 n=20) Input/split_by_json_objects_array_in_chain-8 1.048k ± 0% 1.027k ± 0% -2.00% (p=0.000 n=20) Input/split_by_json_objects_array_with_keep_parent_in_chain-8 1.059k ± 0% 1.037k ± 0% -2.08% (p=0.000 n=20+18) Input/pagination_when_used_with_chaining-8 1.804k ± 0% 1.774k ± 0% -1.66% (p=0.000 n=20) Input/replace_with_clause_and_first_response_object-8 1.590k ± 0% 1.567k ± 0% -1.45% (n=20+19) Input/replace_with_clause_with_hardcoded_value_1-8 1.246k ± 0% 1.225k ± 0% -1.69% (p=0.000 n=19+20) Input/replace_with_clause_with_hardcoded_value_(no_dot_prefix)-8 1.274k ± 0% 1.253k ± 0% -1.65% (p=0.000 n=20) Input/replace_with_clause_with_hardcoded_value_(more_than_one_dot_prefix)-8 1.274k ± 0% 1.253k ± 0% -1.65% (n=20+19) Input/replace_with_clause_with_hardcoded_value_containing_'.'_(dots)-8 1.247k ± 0% 1.226k ± 0% -1.68% (n=18+20) Input/global_transform_context_separation_with_parent_last_response_object-8 2.762k ± 0% 2.719k ± 0% -1.56% (p=0.000 n=20) Input/cursor_value_is_updated_for_root_response_with_chaining_&_pagination-8 3.493k ± 0% 3.449k ± 0% -1.26% (p=0.000 n=20+19) Input/cursor_value_is_updated_for_root_response_with_chaining_&_pagination_along_with_split_operator-8 3.570k ± 0% 3.527k ± 0% -1.20% (p=0.000 n=19+20) Input/Test_simple_XML_decode-8 50.30k ± 0% 50.30k ± 0% -0.01% (p=0.000 n=18+20) Input/split_events_by_not_found-8 1.572k ± 1% geomean 1.235k 1.227k -1.29%
Proposed commit message
Replace concurrent code and locks with sequential processing using handler interface types to simplify data flow.
Post refactor clean up will happen in a follow-up PR.
Checklist
CHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.Author's Checklist
How to test this PR locally
Related issues
Use cases
Screenshots
Logs
Performance impact
The change here is not intended to be a performance improvement, but rather to improve the maintainability and correctness of the code. However, it's worth checking that it does not do too much harm in that regard. The following is a comparison of the parent of this set of changes and the change here.
On mac there is little to no change in the cpu bench performance of the package after the change and a small but consistent improvement in the memory bench performance. A single cpu bench does show a significant improvement, and this is consistent over runs, but is not an important bench and is probably just a weirdness.
On linux (a more reliable platform for benchmarking in my experience) there is a similar reduction in memory cost, but with the superior reliability of benchmarking on linux the noise floor is lowered and it's possible to see an ~3.5% reduction in CPU cost after the change.