Skip to content

Commit

Permalink
x-pack/filebeat/input/httpjson: remove concurrency from response proc…
Browse files Browse the repository at this point in the history
…essing (elastic#36493)

Replace concurrent code and locks with sequential processing using handler
interface types to simplify data flow. Rather that sending events/errors on a
channel to process in a separate gorouting, have a handler that has a couple
of methods to handle either events or error non-concurrently with the event
construction.

The process to refactor was to progressively convert the channel to implement
the proposed handler interface, split the chan range function into the two
handler methods and then remove the range instead calling the appropriate
method within the generator.

After this was done, the locks required to protect concurrent access of
transformers were removed.

goos: linux
goarch: amd64
pkg: github.com/elastic/beats/v7/x-pack/filebeat/input/httpjson
cpu: Intel(R) Core(TM) i7-6700HQ CPU @ 2.60GHz
                                                                                                       │  old.bench   │               new.bench                │
                                                                                                       │    sec/op    │    sec/op     vs base                  │
Input/simple_GET_request-8                                                                               354.0µ ±  1%   335.5µ ±  1%  -5.21% (p=0.000 n=20)
Input/simple_HTTPS_GET_request-8                                                                         3.085m ±  5%   3.015m ±  5%       ~ (p=0.904 n=20)
Input/request_honors_rate_limit-8                                                                        332.4µ ±  0%   325.9µ ±  1%  -1.95% (p=0.001 n=20)
Input/request_retries_when_failed-8                                                                      345.2µ ±  1%   337.4µ ±  1%  -2.25% (p=0.002 n=20)
Input/POST_request_with_body-8                                                                           356.4µ ±  1%   338.4µ ±  1%  -5.06% (p=0.000 n=20)
Input/repeated_POST_requests-8                                                                           103.5m ±  0%   103.4m ±  0%  -0.16% (p=0.024 n=20)
Input/split_by_json_objects_array-8                                                                      361.4µ ±  1%   340.8µ ±  2%  -5.70% (p=0.000 n=19)
Input/split_by_json_objects_array_with_keep_parent-8                                                     361.7µ ±  1%   339.4µ ±  1%  -6.17% (p=0.000 n=20)
Input/split_on_empty_array_without_ignore_empty_value-8                                                  339.1µ ±  1%   330.1µ ±  1%  -2.63% (p=0.000 n=20)
Input/split_on_empty_array_with_ignore_empty_value-8                                                      5.000 ±  0%    5.001 ±  0%       ~ (p=0.301 n=20)
Input/split_on_null_field_with_ignore_empty_value_keeping_parent-8                                       337.0µ ±  1%   325.0µ ±  1%  -3.58% (p=0.000 n=20)
Input/split_on_empty_array_with_ignore_empty_value_keeping_parent-8                                      337.2µ ±  0%   323.8µ ±  1%  -3.97% (p=0.000 n=20)
Input/split_on_null_field_at_root_with_ignore_empty_value_keeping_parent-8                               335.6µ ±  1%   323.0µ ±  0%  -3.74% (p=0.000 n=20)
Input/split_on_empty_array_at_root_with_ignore_empty_value_keeping_parent-8                              337.4µ ±  1%   325.8µ ±  4%  -3.44% (p=0.001 n=14+17)
Input/nested_split-8                                                                                     359.1µ ±  1%   334.6µ ±  0%  -6.84% (p=0.000 n=20)
Input/pagination-8                                                                                       10.40m ±  2%   10.36m ±  1%       ~ (p=0.698 n=20)
Input/first_event-8                                                                                       2.006 ±  0%    2.005 ±  0%  -0.06% (p=0.017 n=20)
Input/oauth2-8                                                                                           579.0µ ±  1%   557.7µ ±  1%  -3.68% (p=0.000 n=20)
Input/request_transforms_can_access_state_from_previous_transforms-8                                     621.9µ ±  2%   590.3µ ±  1%  -5.09% (p=0.000 n=20)
Input/response_transforms_can't_access_request_state_from_previous_transforms-8                          540.5µ ±  1%   512.1µ ±  0%  -5.25% (p=0.000 n=20)
Input/simple_Chain_GET_request-8                                                                         627.1µ ±  1%   585.1µ ±  1%  -6.69% (p=0.000 n=20)
Input/multiple_Chain_GET_request-8                                                                       907.1µ ±  2%   862.1µ ±  5%  -4.96% (p=0.005 n=17)
Input/date_cursor_while_using_chain-8                                                                    871.3µ ±  1%   823.0µ ±  0%  -5.54% (p=0.000 n=20)
Input/split_by_json_objects_array_in_chain-8                                                             638.4µ ±  3%   592.8µ ±  2%  -7.13% (p=0.000 n=20)
Input/split_by_json_objects_array_with_keep_parent_in_chain-8                                            640.4µ ±  0%   593.6µ ±  1%  -7.32% (p=0.000 n=20+18)
Input/pagination_when_used_with_chaining-8                                                               1.318m ±  3%   1.234m ±  3%  -6.39% (p=0.001 n=20)
Input/replace_with_clause_and_first_response_object-8                                                    1.101m ±  1%   1.066m ±  4%       ~ (p=0.054 n=20+19)
Input/replace_with_clause_with_hardcoded_value_1-8                                                       838.4µ ±  2%   805.3µ ±  0%  -3.95% (p=0.000 n=19+20)
Input/replace_with_clause_with_hardcoded_value_(no_dot_prefix)-8                                         842.3µ ±  1%   827.9µ ±  3%  -1.71% (p=0.040 n=20)
Input/replace_with_clause_with_hardcoded_value_(more_than_one_dot_prefix)-8                              849.4µ ±  2%   859.1µ ±  3%       ~ (p=0.923 n=20+19)
Input/replace_with_clause_with_hardcoded_value_containing_'.'_(dots)-8                                   844.6µ ±  4%   831.2µ ± 10%       ~ (p=0.055 n=18+20)
Input/global_transform_context_separation_with_parent_last_response_object-8                             1.940m ±  2%   1.860m ±  1%  -4.12% (p=0.000 n=20)
Input/cursor_value_is_updated_for_root_response_with_chaining_&_pagination-8                             2.174m ±  2%   2.119m ±  1%  -2.52% (p=0.005 n=20+19)
Input/cursor_value_is_updated_for_root_response_with_chaining_&_pagination_along_with_split_operator-8   2.208m ±  1%   2.140m ±  0%  -3.08% (p=0.000 n=19+20)
Input/Test_simple_XML_decode-8                                                                           9.797m ± 11%   9.636m ±  1%  -1.64% (p=0.012 n=18+20)
Input/split_events_by_not_found-8                                                                                        5.001 ±  0%
geomean                                                                                                  1.428m         1.728m        -3.63%

                                                                                                       │  old.bench   │               new.bench                │
                                                                                                       │     B/op     │     B/op      vs base                  │
Input/simple_GET_request-8                                                                               99.76Ki ± 0%   99.16Ki ± 0%  -0.61% (p=0.000 n=20)
Input/simple_HTTPS_GET_request-8                                                                         184.1Ki ± 0%   183.3Ki ± 0%  -0.45% (p=0.000 n=20)
Input/request_honors_rate_limit-8                                                                        98.14Ki ± 0%   97.58Ki ± 0%  -0.58% (p=0.000 n=20)
Input/request_retries_when_failed-8                                                                      98.97Ki ± 0%   98.37Ki ± 0%  -0.61% (p=0.000 n=20)
Input/POST_request_with_body-8                                                                           100.7Ki ± 0%   100.1Ki ± 0%  -0.60% (p=0.000 n=20)
Input/repeated_POST_requests-8                                                                           136.1Ki ± 1%   135.2Ki ± 1%  -0.70% (p=0.002 n=20)
Input/split_by_json_objects_array-8                                                                      102.9Ki ± 0%   101.8Ki ± 0%  -0.99% (p=0.000 n=19)
Input/split_by_json_objects_array_with_keep_parent-8                                                     103.6Ki ± 0%   102.6Ki ± 0%  -1.01% (p=0.000 n=20)
Input/split_on_empty_array_without_ignore_empty_value-8                                                  99.27Ki ± 0%   98.68Ki ± 0%  -0.60% (p=0.000 n=20)
Input/split_on_empty_array_with_ignore_empty_value-8                                                     221.2Ki ± 4%   227.4Ki ± 2%       ~ (p=0.355 n=20)
Input/split_on_null_field_with_ignore_empty_value_keeping_parent-8                                       99.29Ki ± 0%   98.71Ki ± 0%  -0.59% (p=0.000 n=20)
Input/split_on_empty_array_with_ignore_empty_value_keeping_parent-8                                      99.36Ki ± 0%   98.76Ki ± 0%  -0.61% (p=0.000 n=20)
Input/split_on_null_field_at_root_with_ignore_empty_value_keeping_parent-8                               98.76Ki ± 0%   98.17Ki ± 0%  -0.60% (p=0.000 n=20)
Input/split_on_empty_array_at_root_with_ignore_empty_value_keeping_parent-8                              98.82Ki ± 0%   98.22Ki ± 0%  -0.61% (p=0.000 n=14+17)
Input/nested_split-8                                                                                     103.6Ki ± 0%   102.5Ki ± 0%  -0.99% (p=0.000 n=20)
Input/pagination-8                                                                                       429.2Ki ± 0%   425.7Ki ± 0%  -0.80% (p=0.000 n=20)
Input/first_event-8                                                                                      315.2Ki ± 1%   322.6Ki ± 4%       ~ (p=0.512 n=20)
Input/oauth2-8                                                                                           124.4Ki ± 0%   123.8Ki ± 0%  -0.53% (p=0.000 n=20)
Input/request_transforms_can_access_state_from_previous_transforms-8                                     160.2Ki ± 0%   159.6Ki ± 0%  -0.39% (p=0.000 n=20)
Input/response_transforms_can't_access_request_state_from_previous_transforms-8                          140.3Ki ± 0%   139.7Ki ± 0%  -0.46% (p=0.000 n=20)
Input/simple_Chain_GET_request-8                                                                         159.5Ki ± 0%   158.2Ki ± 0%  -0.82% (p=0.000 n=20)
Input/multiple_Chain_GET_request-8                                                                       214.8Ki ± 0%   215.2Ki ± 0%  +0.18% (p=0.013 n=17)
Input/date_cursor_while_using_chain-8                                                                    209.7Ki ± 0%   208.3Ki ± 0%  -0.66% (p=0.000 n=20)
Input/split_by_json_objects_array_in_chain-8                                                             162.7Ki ± 0%   161.0Ki ± 0%  -1.07% (p=0.000 n=20)
Input/split_by_json_objects_array_with_keep_parent_in_chain-8                                            163.5Ki ± 0%   161.8Ki ± 0%  -1.08% (p=0.000 n=20+18)
Input/pagination_when_used_with_chaining-8                                                               247.8Ki ± 0%   245.4Ki ± 0%  -0.95% (p=0.000 n=20)
Input/replace_with_clause_and_first_response_object-8                                                    244.1Ki ± 0%   242.1Ki ± 0%  -0.79% (p=0.000 n=20+19)
Input/replace_with_clause_with_hardcoded_value_1-8                                                       186.3Ki ± 0%   184.6Ki ± 0%  -0.92% (p=0.000 n=19+20)
Input/replace_with_clause_with_hardcoded_value_(no_dot_prefix)-8                                         188.8Ki ± 0%   187.2Ki ± 0%  -0.87% (p=0.000 n=20)
Input/replace_with_clause_with_hardcoded_value_(more_than_one_dot_prefix)-8                              189.0Ki ± 0%   187.3Ki ± 0%  -0.93% (p=0.000 n=20+19)
Input/replace_with_clause_with_hardcoded_value_containing_'.'_(dots)-8                                   186.6Ki ± 0%   184.9Ki ± 0%  -0.91% (p=0.000 n=18+20)
Input/global_transform_context_separation_with_parent_last_response_object-8                             357.2Ki ± 0%   354.4Ki ± 0%  -0.79% (p=0.000 n=20)
Input/cursor_value_is_updated_for_root_response_with_chaining_&_pagination-8                             442.3Ki ± 0%   439.1Ki ± 0%  -0.72% (p=0.000 n=20+19)
Input/cursor_value_is_updated_for_root_response_with_chaining_&_pagination_along_with_split_operator-8   452.7Ki ± 0%   449.2Ki ± 0%  -0.78% (p=0.000 n=19+20)
Input/Test_simple_XML_decode-8                                                                           2.645Mi ± 0%   2.644Mi ± 0%  -0.03% (p=0.000 n=18+20)
Input/split_events_by_not_found-8                                                                                       230.9Ki ± 2%
geomean                                                                                                  178.2Ki        178.6Ki       -0.51%

                                                                                                       │  old.bench  │               new.bench               │
                                                                                                       │  allocs/op  │  allocs/op   vs base                  │
Input/simple_GET_request-8                                                                                631.0 ± 0%    624.0 ± 0%  -1.11% (p=0.000 n=20)
Input/simple_HTTPS_GET_request-8                                                                         1.715k ± 0%   1.708k ± 0%  -0.41% (p=0.000 n=20)
Input/request_honors_rate_limit-8                                                                         601.0 ± 0%    594.0 ± 0%  -1.16% (p=0.000 n=20)
Input/request_retries_when_failed-8                                                                       602.0 ± 0%    595.0 ± 0%  -1.16% (p=0.000 n=20)
Input/POST_request_with_body-8                                                                            640.0 ± 0%    633.0 ± 0%  -1.09% (p=0.000 n=20)
Input/repeated_POST_requests-8                                                                            919.5 ± 0%    905.0 ± 0%  -1.58% (p=0.000 n=20)
Input/split_by_json_objects_array-8                                                                       656.0 ± 0%    643.0 ± 0%  -1.98% (p=0.000 n=19)
Input/split_by_json_objects_array_with_keep_parent-8                                                      666.0 ± 0%    653.0 ± 0%  -1.95% (p=0.000 n=20)
Input/split_on_empty_array_without_ignore_empty_value-8                                                   612.0 ± 0%    605.0 ± 0%  -1.14% (p=0.000 n=20)
Input/split_on_empty_array_with_ignore_empty_value-8                                                     1.516k ± 1%   1.508k ± 0%       ~ (p=0.062 n=20)
Input/split_on_null_field_with_ignore_empty_value_keeping_parent-8                                        610.0 ± 0%    603.0 ± 0%  -1.15% (p=0.000 n=20)
Input/split_on_empty_array_with_ignore_empty_value_keeping_parent-8                                       612.0 ± 0%    605.0 ± 0%  -1.14% (p=0.000 n=20)
Input/split_on_null_field_at_root_with_ignore_empty_value_keeping_parent-8                                607.0 ± 0%    600.0 ± 0%  -1.15% (p=0.000 n=20)
Input/split_on_empty_array_at_root_with_ignore_empty_value_keeping_parent-8                               609.0 ± 0%    602.0 ± 0%  -1.15% (n=14+17)
Input/nested_split-8                                                                                      662.0 ± 0%    649.0 ± 0%  -1.96% (p=0.000 n=20)
Input/pagination-8                                                                                       3.574k ± 0%   3.507k ± 0%  -1.85% (p=0.000 n=20)
Input/first_event-8                                                                                      2.325k ± 0%   2.300k ± 0%  -1.08% (p=0.000 n=20)
Input/oauth2-8                                                                                            862.0 ± 0%    855.0 ± 0%  -0.81% (p=0.000 n=20)
Input/request_transforms_can_access_state_from_previous_transforms-8                                     1.072k ± 0%   1.065k ± 0%  -0.65% (p=0.000 n=20)
Input/response_transforms_can't_access_request_state_from_previous_transforms-8                           918.0 ± 0%    911.0 ± 0%  -0.76% (p=0.000 n=20)
Input/simple_Chain_GET_request-8                                                                         1.022k ± 0%   1.006k ± 0%  -1.52% (p=0.000 n=20)
Input/multiple_Chain_GET_request-8                                                                       1.344k ± 0%   1.332k ± 0%  -0.89% (p=0.000 n=17)
Input/date_cursor_while_using_chain-8                                                                    1.399k ± 0%   1.384k ± 0%  -1.07% (p=0.000 n=20)
Input/split_by_json_objects_array_in_chain-8                                                             1.048k ± 0%   1.027k ± 0%  -2.00% (p=0.000 n=20)
Input/split_by_json_objects_array_with_keep_parent_in_chain-8                                            1.059k ± 0%   1.037k ± 0%  -2.08% (p=0.000 n=20+18)
Input/pagination_when_used_with_chaining-8                                                               1.804k ± 0%   1.774k ± 0%  -1.66% (p=0.000 n=20)
Input/replace_with_clause_and_first_response_object-8                                                    1.590k ± 0%   1.567k ± 0%  -1.45% (n=20+19)
Input/replace_with_clause_with_hardcoded_value_1-8                                                       1.246k ± 0%   1.225k ± 0%  -1.69% (p=0.000 n=19+20)
Input/replace_with_clause_with_hardcoded_value_(no_dot_prefix)-8                                         1.274k ± 0%   1.253k ± 0%  -1.65% (p=0.000 n=20)
Input/replace_with_clause_with_hardcoded_value_(more_than_one_dot_prefix)-8                              1.274k ± 0%   1.253k ± 0%  -1.65% (n=20+19)
Input/replace_with_clause_with_hardcoded_value_containing_'.'_(dots)-8                                   1.247k ± 0%   1.226k ± 0%  -1.68% (n=18+20)
Input/global_transform_context_separation_with_parent_last_response_object-8                             2.762k ± 0%   2.719k ± 0%  -1.56% (p=0.000 n=20)
Input/cursor_value_is_updated_for_root_response_with_chaining_&_pagination-8                             3.493k ± 0%   3.449k ± 0%  -1.26% (p=0.000 n=20+19)
Input/cursor_value_is_updated_for_root_response_with_chaining_&_pagination_along_with_split_operator-8   3.570k ± 0%   3.527k ± 0%  -1.20% (p=0.000 n=19+20)
Input/Test_simple_XML_decode-8                                                                           50.30k ± 0%   50.30k ± 0%  -0.01% (p=0.000 n=18+20)
Input/split_events_by_not_found-8                                                                                      1.572k ± 1%
geomean                                                                                                  1.235k        1.227k       -1.29%
  • Loading branch information
efd6 authored and Scholar-Li committed Feb 5, 2024
1 parent eb245ad commit 7cb85bd
Show file tree
Hide file tree
Showing 6 changed files with 219 additions and 195 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Pin PyYAML version to 5.3.1 to avoid CI errors temporarily {pull}36091[36091]
- Skip dependabot updates for github.com/elastic/mito. {pull}36158[36158]
- Add device handling to Okta API package for entity analytics. {pull}35980[35980]
- Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493]

==== Deprecated

Expand Down
183 changes: 112 additions & 71 deletions x-pack/filebeat/input/httpjson/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p

if len(r.requestFactories) == 1 {
finalResps = append(finalResps, httpResp)
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true)
n = processAndPublishEvents(trCtx, events, publisher, true, r.log)
p := newPublisher(trCtx, publisher, true, r.log)
r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, true, p)
n = p.eventCount()
continue
}

Expand Down Expand Up @@ -118,8 +119,9 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
return err
}
// we avoid unnecessary pagination here since chaining is present, thus avoiding any unexpected updates to cursor values
events := r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false)
n = processAndPublishEvents(trCtx, events, publisher, false, r.log)
p := newPublisher(trCtx, publisher, false, r.log)
r.responseProcessors[i].startProcessing(stdCtx, trCtx, finalResps, false, p)
n = p.eventCount()
} else {
if len(ids) == 0 {
n = 0
Expand Down Expand Up @@ -187,13 +189,13 @@ func (r *requester) doRequest(stdCtx context.Context, trCtx *transformContext, p
resps = intermediateResps
}

var events <-chan maybeMsg
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
if rf.isChain {
events = rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true)
rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p)
} else {
events = r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true)
r.responseProcessors[i].startProcessing(stdCtx, trCtx, resps, true, p)
}
n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
n += p.eventCount()
}
}

Expand Down Expand Up @@ -541,49 +543,71 @@ func (r *requester) getIdsFromResponses(intermediateResps []*http.Response, repl
// processRemainingChainEvents, processes the remaining pagination events for chain blocks
func (r *requester) processRemainingChainEvents(stdCtx context.Context, trCtx *transformContext, publisher inputcursor.Publisher, initialResp []*http.Response, chainIndex int) int {
// we start from 0, and skip the 1st event since we have already processed it
events := r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true)
p := newChainProcessor(r, trCtx, publisher, chainIndex)
r.responseProcessors[0].startProcessing(stdCtx, trCtx, initialResp, true, p)
return p.eventCount()
}

var n int
var eventCount int
for maybeMsg := range events {
if maybeMsg.failed() {
r.log.Errorf("error processing response: %v", maybeMsg)
continue
}
type chainProcessor struct {
req *requester
ctx *transformContext
pub inputcursor.Publisher
idx int
tail bool
n int
}

if n >= 1 { // skip 1st event as it has already ben processed before
var response http.Response
response.StatusCode = 200
body := new(bytes.Buffer)
// we construct a new response here from each of the pagination events
err := json.NewEncoder(body).Encode(maybeMsg.msg)
if err != nil {
r.log.Errorf("error processing chain event: %w", err)
continue
}
response.Body = io.NopCloser(body)
func newChainProcessor(req *requester, trCtx *transformContext, pub inputcursor.Publisher, idx int) *chainProcessor {
return &chainProcessor{
req: req,
ctx: trCtx,
pub: pub,
idx: idx,
}
}

// updates the cursor for pagination last_event & last_response when chaining is present
trCtx.updateLastEvent(maybeMsg.msg)
trCtx.updateCursor()
func (p *chainProcessor) event(ctx context.Context, msg mapstr.M) {
if !p.tail {
// Skip first event as it has already been processed.
p.tail = true
return
}

// for each pagination response, we repeat all the chain steps / blocks
count, err := r.processChainPaginationEvents(stdCtx, trCtx, publisher, &response, chainIndex, r.log)
if err != nil {
r.log.Errorf("error processing chain event: %w", err)
continue
}
eventCount += count
var response http.Response
response.StatusCode = 200
body := new(bytes.Buffer)
// we construct a new response here from each of the pagination events
err := json.NewEncoder(body).Encode(msg)
if err != nil {
p.req.log.Errorf("error processing chain event: %w", err)
return
}
response.Body = io.NopCloser(body)

err = response.Body.Close()
if err != nil {
r.log.Errorf("error closing http response body: %w", err)
}
}
// updates the cursor for pagination last_event & last_response when chaining is present
p.ctx.updateLastEvent(msg)
p.ctx.updateCursor()

n++
// for each pagination response, we repeat all the chain steps / blocks
n, err := p.req.processChainPaginationEvents(ctx, p.ctx, p.pub, &response, p.idx, p.req.log)
if err != nil {
p.req.log.Errorf("error processing chain event: %w", err)
return
}
return eventCount
p.n += n

err = response.Body.Close()
if err != nil {
p.req.log.Errorf("error closing http response body: %w", err)
}
}

func (p *chainProcessor) fail(err error) {
p.req.log.Errorf("error processing response: %v", err)
}

func (p *chainProcessor) eventCount() int {
return p.n
}

// processChainPaginationEvents takes a pagination response as input and runs all the chain blocks for the input
Expand Down Expand Up @@ -675,8 +699,9 @@ func (r *requester) processChainPaginationEvents(stdCtx context.Context, trCtx *
}
resps = intermediateResps
}
events := rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true)
n += processAndPublishEvents(chainTrCtx, events, publisher, i < len(r.requestFactories), r.log)
p := newPublisher(chainTrCtx, publisher, i < len(r.requestFactories), r.log)
rf.chainResponseProcessor.startProcessing(stdCtx, chainTrCtx, resps, true, p)
n += p.eventCount()
}

defer func() {
Expand All @@ -697,36 +722,52 @@ func generateNewUrl(replacement, oldUrl, id string) (url.URL, error) {
return *newUrl, nil
}

// processAndPublishEvents process and publish events based on event type
func processAndPublishEvents(trCtx *transformContext, events <-chan maybeMsg, publisher inputcursor.Publisher, publish bool, log *logp.Logger) int {
var n int
for maybeMsg := range events {
if maybeMsg.failed() {
log.Errorf("error processing response: %v", maybeMsg)
continue
}
type publisher struct {
ctx *transformContext
pub inputcursor.Publisher
n int
log *logp.Logger
}

if publish {
event, err := makeEvent(maybeMsg.msg)
if err != nil {
log.Errorf("error creating event: %v", maybeMsg)
continue
}
func newPublisher(trCtx *transformContext, pub inputcursor.Publisher, publish bool, log *logp.Logger) *publisher {
if !publish {
pub = nil
}
return &publisher{
ctx: trCtx,
pub: pub,
log: log,
}
}

if err := publisher.Publish(event, trCtx.cursorMap()); err != nil {
log.Errorf("error publishing event: %v", err)
continue
}
}
if len(*trCtx.firstEventClone()) == 0 {
trCtx.updateFirstEvent(maybeMsg.msg)
func (p *publisher) event(_ context.Context, msg mapstr.M) {
if p.pub != nil {
event, err := makeEvent(msg)
if err != nil {
p.log.Errorf("error creating event: %v: %v", msg, err)
return
}
trCtx.updateLastEvent(maybeMsg.msg)
trCtx.updateCursor()

n++
if err := p.pub.Publish(event, p.ctx.cursorMap()); err != nil {
p.log.Errorf("error publishing event: %v", err)
return
}
}
if len(*p.ctx.firstEventClone()) == 0 {
p.ctx.updateFirstEvent(msg)
}
return n
p.ctx.updateLastEvent(msg)
p.ctx.updateCursor()

p.n++
}

func (p *publisher) fail(err error) {
p.log.Errorf("error processing response: %v", err)
}

func (p *publisher) eventCount() int {
return p.n
}

const (
Expand Down
120 changes: 59 additions & 61 deletions x-pack/filebeat/input/httpjson/response.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,83 +180,81 @@ func newChainResponseProcessor(config chainConfig, httpClient *httpClient, xmlDe
return rp
}

func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool) <-chan maybeMsg {
type sendStream interface {
event(context.Context, mapstr.M)
fail(error)
}

func (rp *responseProcessor) startProcessing(stdCtx context.Context, trCtx *transformContext, resps []*http.Response, paginate bool, ch sendStream) {
trCtx.clearIntervalData()

ch := make(chan maybeMsg)
go func() {
defer close(ch)
var npages int64

for i, httpResp := range resps {
iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails)
for {
pageStartTime := time.Now()
page, hasNext, err := iter.next()
if err != nil {
ch <- maybeMsg{err: err}
return
}
var npages int64
for i, httpResp := range resps {
iter := rp.pagination.newPageIterator(stdCtx, trCtx, httpResp, rp.xmlDetails)
for {
pageStartTime := time.Now()
page, hasNext, err := iter.next()
if err != nil {
ch.fail(err)
return
}

if !hasNext {
if i+1 != len(resps) {
break
}
return
if !hasNext {
if i+1 != len(resps) {
break
}
return
}

respTrs := page.asTransformables(rp.log)
respTrs := page.asTransformables(rp.log)

if len(respTrs) == 0 {
return
}
if len(respTrs) == 0 {
return
}

// last_response context object is updated here organically
trCtx.updateLastResponse(*page)
npages = page.page
// last_response context object is updated here organically
trCtx.updateLastResponse(*page)
npages = page.page

rp.log.Debugf("last received page: %#v", trCtx.lastResponse)
rp.log.Debugf("last received page: %#v", trCtx.lastResponse)

for _, tr := range respTrs {
for _, t := range rp.transforms {
tr, err = t.run(trCtx, tr)
if err != nil {
ch <- maybeMsg{err: err}
return
}
for _, tr := range respTrs {
for _, t := range rp.transforms {
tr, err = t.run(trCtx, tr)
if err != nil {
ch.fail(err)
return
}
}

if rp.split == nil {
ch <- maybeMsg{msg: tr.body()}
rp.log.Debug("no split found: continuing")
continue
}
if rp.split == nil {
ch.event(stdCtx, tr.body())
rp.log.Debug("no split found: continuing")
continue
}

if err := rp.split.run(trCtx, tr, ch); err != nil {
switch err { //nolint:errorlint // run never returns a wrapped error.
case errEmptyField:
// nothing else to send for this page
rp.log.Debug("split operation finished")
case errEmptyRootField:
// root field not found, most likely the response is empty
rp.log.Debug(err)
default:
rp.log.Debug("split operation failed")
ch <- maybeMsg{err: err}
return
}
if err := rp.split.run(trCtx, tr, ch); err != nil {
switch err { //nolint:errorlint // run never returns a wrapped error.
case errEmptyField:
// nothing else to send for this page
rp.log.Debug("split operation finished")
case errEmptyRootField:
// root field not found, most likely the response is empty
rp.log.Debug(err)
default:
rp.log.Debug("split operation failed")
ch.fail(err)
return
}
}
}

rp.metrics.updatePageExecutionTime(pageStartTime)
rp.metrics.updatePageExecutionTime(pageStartTime)

if !paginate {
break
}
if !paginate {
break
}
}
rp.metrics.updatePagesPerInterval(npages)
}()

return ch
}
rp.metrics.updatePagesPerInterval(npages)
}
Loading

0 comments on commit 7cb85bd

Please sign in to comment.