Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix chain call issue in HTTP JSON input #34325

Closed
wants to merge 31 commits into from
Closed
Show file tree
Hide file tree
Changes from 8 commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
72afcf0
fix chain call issue in HTTP JSON input
kush-elastic Jan 20, 2023
88a3ed2
exit go routines in case of errors and update unit tests
kush-elastic Jan 27, 2023
6a95051
add unit tests for different cases
kush-elastic Feb 2, 2023
12998fc
add change log entry
kush-elastic Feb 3, 2023
723ab2c
Merge branch 'main' into httjson-chaincall-fix
kush-elastic Feb 3, 2023
5676eab
address review comments
kush-elastic Feb 13, 2023
ca5109f
fix id collection concurrency issue
kush-elastic Feb 13, 2023
4e9bccb
Merge branch 'main' into httjson-chaincall-fix
kush-elastic Feb 13, 2023
b930e45
address review comments
kush-elastic Feb 14, 2023
b295f27
address review comment-add function description
kush-elastic Feb 15, 2023
800169c
Merge branch 'main' into httjson-chaincall-fix
kush-elastic Feb 16, 2023
9a05fc7
Merge branch 'main' into httjson-chaincall-fix
kush-elastic Feb 22, 2023
889d13f
Merge branch 'main' into httjson-chaincall-fix
kush-elastic Mar 22, 2023
e2294d3
resolve golang-ci lint errors
kush-elastic Mar 22, 2023
7952476
Merge branch 'main' into httjson-chaincall-fix
kush-elastic Apr 3, 2023
1558a6a
Merge branch 'main' into httjson-chaincall-fix
kush-elastic Apr 4, 2023
44771c4
Merge branch 'main' into httjson-chaincall-fix
kush-elastic Apr 10, 2023
956dced
Merge branch 'main' of https://github.com/kush-elastic/beats into htt…
kush-elastic May 11, 2023
d7ac3ed
Merge branch 'main' of https://github.com/kush-elastic/beats into htt…
kush-elastic May 22, 2023
c663ced
Merge branch 'main' of https://github.com/kush-elastic/beats into htt…
kush-elastic Jun 6, 2023
e3e52a3
resolve golangci-lint issues
kush-elastic Jun 6, 2023
bc0af78
Merge branch 'main' of https://github.com/kush-elastic/beats into htt…
kush-elastic Jun 13, 2023
8f640f0
remove fmt.Print*
kush-elastic Jun 13, 2023
68a5711
Merge branch 'main' into httjson-chaincall-fix
kush-elastic Jun 13, 2023
d15310d
Merge branch 'main' into httjson-chaincall-fix
kush-elastic Jun 14, 2023
992643d
Merge branch 'main' into httjson-chaincall-fix
kush-elastic Jun 16, 2023
e9343b3
Merge branch 'main' into httjson-chaincall-fix
kush-elastic Jun 23, 2023
8e4b33f
resolve review comments
kush-elastic Jul 10, 2023
49c61f4
Merge branch 'main' into httjson-chaincall-fix
kush-elastic Jul 10, 2023
91f1ac3
resolve comment and remove white space changes
kush-elastic Jul 10, 2023
ad7bddf
remove white space
kush-elastic Jul 10, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
- Fix handling of quoted values in auditd module. {issue}22587[22587] {pull}34069[34069]
- Fixing system tests not returning expected content encoding for azure blob storage input. {pull}34412[34412]
- [Azure Logs] Fix authentication_processing_details parsing in sign-in logs. {issue}34330[34330] {pull}34478[34478]
- Fix `httpjson` input storing all the responses in memory for one iteration of chain call. {issue}34310[34310] {pull}34325[34325]


*Heartbeat*

Expand Down
182 changes: 178 additions & 4 deletions x-pack/filebeat/input/httpjson/input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"math/rand"
"net/http"
"net/http/httptest"
"strings"
"testing"
"time"

Expand All @@ -23,7 +24,15 @@ import (
"github.com/elastic/elastic-agent-libs/logp"
)

const dummyText = "Lorem Ipsum is simply dummy text of the printing and typesetting industry. Lorem Ipsum has been the industry's standard dummy text ever since the 1500s, when an unknown printer took a galley of type and scrambled it to make a type specimen book. It has survived not only five centuries, but also the leap into electronic typesetting, remaining essentially unchanged. It was popularised in the 1960s with the release of Letraset sheets containing Lorem Ipsum passages, and more recently with desktop publishing software like Aldus PageMaker including versions of Lorem Ipsum."
kush-elastic marked this conversation as resolved.
Show resolved Hide resolved

func TestInput(t *testing.T) {
var expectedBulkData []string
for i := 0; i < 2; i++ {
for j := 0; j < 2500; j++ {
expectedBulkData = append(expectedBulkData, fmt.Sprintf(`{ "text%d":%q }`, i+1, strings.Repeat(dummyText, 17)))
}
}
testCases := []struct {
name string
setupServer func(*testing.T, http.HandlerFunc, map[string]interface{})
Expand Down Expand Up @@ -678,6 +687,31 @@ func TestInput(t *testing.T) {
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test pagination with end condition",
setupServer: newPaginationTestServer(httptest.NewServer),
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"response.split": map[string]interface{}{
"target": "body.records",
},
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.value",
"value": "[[if (ne .last_response.body.done true)]][[.last_response.body.nextLink]][[end]]",
"fail_on_template_error": true,
},
},
},
},
handler: defaultHandler(http.MethodGet, "", ""),
expected: []string{
`{"id":1}`,
`{"id":2}`,
},
},
{
name: "Test pagination when used with chaining",
setupServer: newChainPaginationTestServer(httptest.NewServer),
Expand Down Expand Up @@ -708,6 +742,60 @@ func TestInput(t *testing.T) {
`{"space":{"cake":"pumpkin"}}`,
},
},
{
name: "Test pagination when used with chaining Unable to get id from response",
setupServer: newChainPaginationTestServerWithInvalidJSON(httptest.NewServer),
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.value",
"value": "[[.last_response.body.nextLink]]",
"fail_on_template_error": true,
},
},
},
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.records[:].id",
},
},
},
},
handler: defaultHandler(http.MethodGet, "", ""),
expected: []string{},
},
{
name: "Test pagination when used with chaining with bulk data",
setupServer: newChainPaginationLoadTestServer(httptest.NewServer),
baseConfig: map[string]interface{}{
"interval": 1,
"request.method": http.MethodGet,
"response.pagination": []interface{}{
map[string]interface{}{
"set": map[string]interface{}{
"target": "url.value",
"value": "[[.last_response.body.nextLink]]",
"fail_on_template_error": true,
},
},
},
"chain": []interface{}{
map[string]interface{}{
"step": map[string]interface{}{
"request.method": http.MethodGet,
"replace": "$.records[:].id",
},
},
},
},
handler: defaultHandler(http.MethodGet, "", ""),
expected: expectedBulkData,
},
{
name: "Test replace_with clause and first_response object",
setupServer: func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
Expand Down Expand Up @@ -973,7 +1061,7 @@ func TestInput(t *testing.T) {
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintf(w, `{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212", "createdAt":"22/02/2022",
fmt.Fprintf(w, `{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212", "createdAt":"22/02/2022",
"nextLink":"%s/link1"}`, serverURL)
case "/link1":
fmt.Fprintln(w, `{"files":[{"id":"3"},{"id":"4"}], "exportId":"2213", "createdAt":"24/04/2022"}`)
Expand Down Expand Up @@ -1052,7 +1140,7 @@ func TestInput(t *testing.T) {
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
fmt.Fprintf(w, `{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212","time":[{"timeStamp":"22/02/2022"}],
fmt.Fprintf(w, `{"files":[{"id":"1"},{"id":"2"}],"exportId":"2212","time":[{"timeStamp":"22/02/2022"}],
"nextLink":"%s/link1"}`, serverURL)
case "/link1":
fmt.Fprintln(w, `{"files":[{"id":"3"},{"id":"4"}], "exportId":"2213","time":[{"timeStamp":"24/04/2022"}]}`)
Expand Down Expand Up @@ -1154,8 +1242,13 @@ func TestInput(t *testing.T) {
g.Go(func() error {
return input.Run(ctx, chanClient)
})

timeout := time.NewTimer(5 * time.Second)
var timeoutDuration time.Duration
if testCase.name == "Test pagination when used with chaining with bulk data" {
timeoutDuration = 2 * time.Minute
} else {
timeoutDuration = 10 * time.Second
}
timeout := time.NewTimer(timeoutDuration)
t.Cleanup(func() { _ = timeout.Stop() })

if len(tc.expected) == 0 {
Expand Down Expand Up @@ -1250,6 +1343,87 @@ func newChainPaginationTestServer(
}
}

func newChainPaginationTestServerWithInvalidJSON(
newServer func(http.Handler) *httptest.Server,
) func(*testing.T, http.HandlerFunc, map[string]interface{}) {
return func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
registerPaginationTransforms()
var serverURL string
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
link := serverURL + "/link2"
value := fmt.Sprintf(`"records":[{"id":1}], "nextLink":"%s"}`, link)
fmt.Fprintln(w, value)
case "/1":
fmt.Fprintln(w, `{"hello":{"world":"moon"}}`)
}
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
serverURL = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.records[:].id"
t.Cleanup(func() { registeredTransforms = newRegistry() })
}
}

func newPaginationTestServer(
newServer func(http.Handler) *httptest.Server,
) func(*testing.T, http.HandlerFunc, map[string]interface{}) {
return func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
registerPaginationTransforms()
registerResponseTransforms()
var serverURL string
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
link := serverURL + "/link2"
value := fmt.Sprintf(`{"records":[{"id":1}], "nextLink":"%s", "done": false}`, link)
fmt.Fprintln(w, value)
case "/link2":
link := serverURL + "/"
value := fmt.Sprintf(`{"records":[{"id":2}], "nextLink":"%s","done": true}`, link)
fmt.Fprintln(w, value)
}
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
serverURL = server.URL
t.Cleanup(func() { registeredTransforms = newRegistry() })
}
}

func newChainPaginationLoadTestServer(
newServer func(http.Handler) *httptest.Server,
) func(*testing.T, http.HandlerFunc, map[string]interface{}) {
records1 := strings.Repeat(`{"id":1},`, 2499) + `{"id":1}`
records2 := strings.Repeat(`{"id":2},`, 2499) + `{"id":2}`

return func(t *testing.T, h http.HandlerFunc, config map[string]interface{}) {
registerPaginationTransforms()
var serverURL string
r := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/":
link := serverURL + "/link2"
value := fmt.Sprintf(`{"records":[`+records1+`], "nextLink":"%s"}`, link)
fmt.Fprintln(w, value)
case "/1":
fmt.Fprintf(w, `{ "text1":%q }`+"\n", strings.Repeat(dummyText, 17))
case "/link2":
fmt.Fprintln(w, `{"records":[`+records2+`]}`)
case "/2":
fmt.Fprintf(w, `{ "text2":%q }`+"\n", strings.Repeat(dummyText, 17))
}
})
server := httptest.NewServer(r)
config["request.url"] = server.URL
serverURL = server.URL
config["chain.0.step.request.url"] = server.URL + "/$.records[:].id"
t.Cleanup(func() { registeredTransforms = newRegistry() })
}
}

func newV2Context() (v2.Context, func()) {
ctx, cancel := context.WithCancel(context.Background())
return v2.Context{
Expand Down
Loading