-
Notifications
You must be signed in to change notification settings - Fork 4.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
http_endpoint: Allow receiving multiple documents on a single request #25764
Conversation
Pinging @elastic/security-external-integrations (Team:Security-External Integrations) |
💚 Build Succeeded
Expand to view the summary
Build stats
Test stats 🧪
Trends 🧪💚 Flaky test reportTests succeeded. Expand to view the summary
Test stats 🧪
|
obj := bytes.TrimLeft(b, " \t\r\n") | ||
if len(obj) > 0 && obj[0] == '{' { | ||
return true | ||
func httpReadNDJSON(body io.Reader) (objs []common.MapStr, status int, err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I might be misreading this, but could we just have one function? I think with json.NewDecoder we could handle:
{"a": 1}
or
{"a": 1}{"b":2}
or
{"a":1}
{"b":2}
or
[{"a":1},{"b":2}]
and then json or ndjson would both work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Something like this:
func httpReadJSON(body io.Reader) (objs []common.MapStr, status int, err error) {
if body == http.NoBody {
return nil, http.StatusNotAcceptable, errBodyEmpty
}
decoder := json.NewDecoder(body)
for idx := 0; ; idx++ {
var obj interface{}
if err := decoder.Decode(&obj); err != nil {
if err == io.EOF {
break
}
return nil, http.StatusBadRequest, errors.Wrapf(err, "malformed JSON object at stream position %d", idx)
}
switch v := obj.(type) {
case map[string]interface{}:
objs = append(objs, v)
case []interface{}:
for listIdx, listObj := range v {
asMap, ok := listObj.(map[string]interface{})
if !ok {
return nil, http.StatusBadRequest, fmt.Errorf("%v at obj index %d, list index %d", errUnsupportedType, idx, listIdx)
}
objs = append(objs, asMap)
}
default:
return nil, http.StatusBadRequest, errUnsupportedType
}
}
return objs, 0, nil
}
I can push it to the PR if you want.
This pull request is now in conflicts. Could you fix it? 🙏
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In case 2 function approach is best fit.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Updates Filebeat's http_endpoint to produce multiple documents from a single POST request. This extends the application/json format handling to accept arrays of objects, and adds support for the NDJSON format (application/x-ndjson).
This uses a single parser that accepts both JSON and NDJSON
cac6941
to
ceb041e
Compare
/test |
This pull request is now in conflicts. Could you fix it? 🙏
|
…#25764) (#26175) Updates Filebeat's http_endpoint to produce multiple documents from a single POST request. This extends the application/json format handling to accept arrays of objects, and adds support for the NDJSON format (application/x-ndjson). (cherry picked from commit 8bbb26f) Co-authored-by: Adrian Serrano <adrisr83@gmail.com>
What does this PR do?
Updates Filebeat's
http_endpoint
to support receiving multiple documents from a single POST request.Until now it only accepted a single document (JSON object) per request.
With this PR:
Why is it important?
Minimizes the number of requests for high-volume ingestion.
Checklist
[ ] I have made corresponding change to the default configuration filesCHANGELOG.next.asciidoc
orCHANGELOG-developer.next.asciidoc
.