Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[filebeat][httpjson] - Separation of global transform contexts and introduction of parent transform context within chains #33499

Merged
merged 11 commits into from
Nov 8, 2022
Merged
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix handling of invalid UserIP and LocalIP values. {pull}32896[32896]
- Allow http_endpoint instances to share ports. {issue}32578[32578] {pull}33377[33377]
- Improve httpjson documentation for split processor. {pull}33473[33473]
- Added separation of transform context object inside httpjson. Introduced new clause `.parent_last_response.*` {pull}33499[33499]

*Auditbeat*

Expand Down
62 changes: 57 additions & 5 deletions x-pack/filebeat/docs/inputs/input-httpjson.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -531,7 +531,7 @@ List of transforms to apply to the request before each execution.

Available transforms for request: [`append`, `delete`, `set`].

Can read state from: [`.first_response.*`,`.last_response.*`, `.last_event.*`, `.cursor.*`, `.header.*`, `.url.*`, `.body.*`].
Can read state from: [`.first_response.*`,`.last_response.*`, `.parent_last_response.*` `.last_event.*`, `.cursor.*`, `.header.*`, `.url.*`, `.body.*`].

Can write state to: [`body.*`, `header.*`, `url.*`].

Expand All @@ -547,6 +547,47 @@ filebeat.inputs:
value: '[[now (parseDuration "-1h")]]'
----

NOTE: The clause `.parent_last_response.` should only be used from within chain steps and when pagination exists at the root request level. If pagination
does not exist at the root level, please use the clause `.first_response.` to access parent response object from within chains. You can look at this
<<parent-last-response,example>> below for a better idea.


["source","yaml",subs="attributes",id="parent-last-response"]
filebeat.inputs:
- type: httpjson
enabled: true
id: my-httpjson-id
request.url: http://xyz.com/services/data/v1.0/export_ids/page
request.method: POST
interval: 1h
request.retry.max_attempts: 2
request.retry.wait_min: 5s
request.transforms:
- set:
target: body.page
value: 0
response.request_body_on_pagination: true
response.pagination:
- set:
target: body.page
value: '[[ .last_response.body.page ]]'
fail_on_template_error: true
chain:
- step:
request.url: http://xyz.com/services/data/v1.0/$.exportId/export_ids/$.files[:].id/info
request.method: POST
request.transforms:
- set:
target: body.exportId
value: '[[ .parent_last_response.body.exportId ]]'
replace: $.files[:].id
replace_with: '$.exportId,.parent_last_response.body.exportId'

Here we can see that the chain step uses `.parent_last_response.body.exportId` only because `response.pagination` is present for the parent (root) request.
However if `response.pagination` was not present in the parent (root) request, `replace_with` clause should have used `.first_response.body.exportId`. This is
because when pagination does not exist at the parent level `parent_last_response` object is not populated with required values for performance reasons, but the
`first_response` object always stores the very first response in the process chain.

[float]
==== `request.tracer.filename`

Expand Down Expand Up @@ -1141,7 +1182,7 @@ Collect and make events from response in any format supported by httpjson for al

The `replace_with: "pattern,value"` clause is used to replace a fixed pattern string defined in `request.url` with the given value.
The fixed pattern must have a `$.` prefix, for example: `$.xyz`. The `value` may be hard coded or extracted from context variables
like [`.last_response.*`, `.first_response.*`] etc. The `replace_with` clause can be used in combination with the `replace` clause
like [`.last_response.*`, `.first_response.*`, `.parent_last_response.*`] etc. The `replace_with` clause can be used in combination with the `replace` clause
thus providing a lot of flexibility in the logic of chain requests.

Example:
Expand All @@ -1167,7 +1208,7 @@ filebeat.inputs:
- step:
request.url: https://example.com/services/data/v1.0/$.exportId/files
request.method: GET
replace_with: '$.exportId,first_response.body.exportId'
replace_with: '$.exportId,.first_response.body.exportId'
----

Example:
Expand Down Expand Up @@ -1217,8 +1258,19 @@ response_json using exportId as '2212':
----
This behaviour of targeted fixed pattern replacement in the url helps solve various use cases.

NOTE: Fixed patterns must not contain commas in their definition. String replacement patterns are matched by the
`replace_with` processor with exact string matching.
**Some useful points to remember:- **

1. If you want the `value` to be treated as an expression to be evaluated for data extraction from context variables, it should always have a
**single '.' (dot) prefix**. Example: `replace_with: '$.exportId,.first_response.body.exportId'`. Anything more or less will have the internal
processor treat it as a hard coded value, `replace_with: '$.exportId,..first_response.body.exportId'` (more than one '.' (dot) as prefix) or
`replace_with:'$.exportId,first_response.body.exportId'` (no '.' dot as prefix)

2. Incomplete `value expressions` will cause an error while processing. Example: `replace_with: '$.exportId,.first_response.'`, `replace_with:
'$.exportId,.last_response.'` etc. These expressions are incomplete because they do not evaluate down to a valid key that can be extracted from
the context variables. The value expression: `.first_response.`, on processing, will result in an array `[first_response ""]` where the key to be
extrated becomes `"" (an empty string)`, which has no definition within any context variable.

NOTE: Fixed patterns must not contain commas in their definition. String replacement patterns are matched by the `replace_with` processor with exact string matching.

[float]
==== `chain[].while`
Expand Down
227 changes: 223 additions & 4 deletions x-pack/filebeat/input/httpjson/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package httpjson
import (
"context"
"fmt"
"io/ioutil"
"io"
"math/rand"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -683,17 +683,227 @@ func TestInput(t *testing.T) {
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.files[:].id",
"replace_with": "$.exportId,first_response.body.exportId",
"replace_with": "$.exportId,.first_response.body.exportId",
},
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test replace_with clause with hardcoded value_1",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintln(w, `{"files":[{"id":"1"},{"id":"2"}]}`)
case "/2212/1":
fmt.Fprintln(w, `{"hello":{"world":"moon"}}`)
case "/2212/2":
fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`)
}
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.files[:].id",
"replace_with": "$.exportId,2212",
},
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test replace_with clause with hardcoded value (no dot prefix)",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintln(w, `{"files":[{"id":"1"},{"id":"2"}]}`)
case "/first_response.body.id/1":
fmt.Fprintln(w, `{"hello":{"world":"moon"}}`)
case "/first_response.body.id/2":
fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`)
}
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.files[:].id",
"replace_with": "$.exportId,first_response.body.id",
},
},
},
},
handler: defaultHandler(http.MethodGet, "", ""),
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test replace_with clause with hardcoded value (more than one dot prefix)",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintln(w, `{"files":[{"id":"1"},{"id":"2"}]}`)
case "/..first_response.body.id/1":
fmt.Fprintln(w, `{"hello":{"world":"moon"}}`)
case "/..first_response.body.id/2":
fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`)
}
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.files[:].id",
"replace_with": "$.exportId,..first_response.body.id",
},
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test replace_with clause with hardcoded value containing '.' (dots)",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintln(w, `{"files":[{"id":"1"},{"id":"2"}]}`)
case "/.xyz.2212.abc./1":
fmt.Fprintln(w, `{"hello":{"world":"moon"}}`)
case "/.xyz.2212.abc./2":
fmt.Fprintln(w, `{"space":{"cake":"pumpkin"}}`)
}
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.files[:].id",
"replace_with": "$.exportId,.xyz.2212.abc.",
},
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test global transform context separation with parent_last_response object",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
var serverURL string
registerPaginationTransforms()
registerRequestTransforms()
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintf(w, `{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212", "nextLink":"%s/link1"}`, serverURL)
case "/link1":
fmt.Fprintln(w, `{"files":[{"id":"3"},{"id":"4"}], "exportId":"2213"}`)
case "/2212/1":
matchBody(w, r, `{"exportId":"2212"}`, `{"hello":{"world":"moon"}}`)
case "/2212/2":
matchBody(w, r, `{"exportId":"2212"}`, `{"space":{"cake":"pumpkin"}}`)
case "/2213/3":
matchBody(w, r, `{"exportId":"2213"}`, `{"hello":{"cake":"pumpkin"}}`)
case "/2213/4":
matchBody(w, r, `{"exportId":"2213"}`, `{"space":{"world":"moon"}}`)
}
})
server := httptest.NewServer(r)
t.Cleanup(func() { registeredTransforms = newRegistry() })
config["request.url"] = server.URL
serverURL = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.exportId/$.files[:].id"
t.Cleanup(server.Close)
},
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodPost,
"response.request_body_on_pagination": true,
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.value",
"value": "[[.last_response.body.nextLink]]",
"fail_on_template_error": true,
},
},
},
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodPost,
"replace": "$.files[:].id",
"replace_with": "$.exportId,.parent_last_response.body.exportId",
"request.transforms": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "body.exportId",
"value": "[[ .parent_last_response.body.exportId ]]",
},
},
},
},
},
},
},
expected: []string{
`{"hello":{"world":"moon"}}`,
`{"space":{"cake":"pumpkin"}}`,
`{"hello":{"cake":"pumpkin"}}`,
`{"space":{"world":"moon"}}`,
},
},
}

for _, testCase := range testCases {
Expand Down Expand Up @@ -826,6 +1036,15 @@ func newV2Context() (v2.Context, func()) {
}, cancel
}

//nolint:errcheck // We can safely ignore errors here
func matchBody(w io.Writer, req *http.Request, match, response string) {
body, _ := io.ReadAll(req.Body)
req.Body.Close()
if string(body) == match {
w.Write([]byte(response))
}
}

func defaultHandler(expectedMethod, expectedBody, msg string) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("content-type", "application/json")
Expand All @@ -837,7 +1056,7 @@ func defaultHandler(expectedMethod, expectedBody, msg string) http.HandlerFunc {
w.WriteHeader(http.StatusBadRequest)
msg = fmt.Sprintf(`{"error":"expected method was %q"}`, expectedMethod)
case expectedBody != "":
body, _ := ioutil.ReadAll(r.Body)
body, _ := io.ReadAll(r.Body)
r.Body.Close()
if expectedBody != string(body) {
w.WriteHeader(http.StatusBadRequest)
Expand Down
Loading