Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

x-pack/filebeat/input/httpjson: reorganise code to improve code locality #36439

Merged
merged 6 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion x-pack/filebeat/input/httpjson/config_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func (c *requestConfig) Validate() error {
return fmt.Errorf("unsupported method %q", c.Method)
}

if _, err := newBasicTransformsFromConfig(c.Transforms, requestNamespace, nil); err != nil {
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Transforms, requestNamespace, nil); err != nil {
return err
}

Expand Down
6 changes: 3 additions & 3 deletions x-pack/filebeat/input/httpjson/config_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ type splitConfig struct {
}

func (c *responseConfig) Validate() error {
if _, err := newBasicTransformsFromConfig(c.Transforms, responseNamespace, nil); err != nil {
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Transforms, responseNamespace, nil); err != nil {
return err
}
if _, err := newBasicTransformsFromConfig(c.Pagination, paginationNamespace, nil); err != nil {
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Pagination, paginationNamespace, nil); err != nil {
return err
}
if c.DecodeAs != "" {
Expand All @@ -52,7 +52,7 @@ func (c *responseConfig) Validate() error {
}

func (c *splitConfig) Validate() error {
if _, err := newBasicTransformsFromConfig(c.Transforms, responseNamespace, nil); err != nil {
if _, err := newBasicTransformsFromConfig(registeredTransforms, c.Transforms, responseNamespace, nil); err != nil {
return err
}

Expand Down
109 changes: 33 additions & 76 deletions x-pack/filebeat/input/httpjson/encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,57 +14,8 @@ import (
"net/http"

"github.com/elastic/mito/lib/xml"

"github.com/elastic/elastic-agent-libs/logp"
)

type encoderFunc func(trReq transformable) ([]byte, error)

type decoderFunc func(p []byte, dst *response) error

var (
registeredEncoders = map[string]encoderFunc{}
registeredDecoders = map[string]decoderFunc{}
defaultEncoder encoderFunc = encodeAsJSON
defaultDecoder decoderFunc = decodeAsJSON
)

func registerEncoder(contentType string, enc encoderFunc) error {
if contentType == "" {
return errors.New("content-type can't be empty")
}

if enc == nil {
return errors.New("encoder can't be nil")
}

if _, found := registeredEncoders[contentType]; found {
return errors.New("already registered")
}

registeredEncoders[contentType] = enc

return nil
}

func registerDecoder(contentType string, dec decoderFunc) error {
if contentType == "" {
return errors.New("content-type can't be empty")
}

if dec == nil {
return errors.New("decoder can't be nil")
}

if _, found := registeredDecoders[contentType]; found {
return errors.New("already registered")
}

registeredDecoders[contentType] = dec

return nil
}

func encode(contentType string, trReq transformable) ([]byte, error) {
enc, found := registeredEncoders[contentType]
if !found {
Expand All @@ -81,35 +32,34 @@ func decode(contentType string, p []byte, dst *response) error {
return dec(p, dst)
}

func registerEncoders() {
log := logp.L().Named(logName)
log.Debugf("registering encoder 'application/json': returned error: %#v",
registerEncoder("application/json", encodeAsJSON))

log.Debugf("registering encoder 'application/x-www-form-urlencoded': returned error: %#v",
registerEncoder("application/x-www-form-urlencoded", encodeAsForm))
}

func registerDecoders() {
log := logp.L().Named(logName)
log.Debugf("registering decoder 'application/json': returned error: %#v",
registerDecoder("application/json", decodeAsJSON))

log.Debugf("registering decoder 'application/x-ndjson': returned error: %#v",
registerDecoder("application/x-ndjson", decodeAsNdjson))

log.Debugf("registering decoder 'text/csv': returned error: %#v",
registerDecoder("text/csv", decodeAsCSV))

log.Debugf("registering decoder 'application/zip': returned error: %#v",
registerDecoder("application/zip", decodeAsZip))
var (
// registeredEncoders is the set of available encoders.
registeredEncoders = map[string]encoderFunc{
"application/json": encodeAsJSON,
"application/x-www-form-urlencoded": encodeAsForm,
}
// defaultEncoder is the decoder used when no registers
// encoder is available.
defaultEncoder = encodeAsJSON

// registeredDecoders is the set of available decoders.
registeredDecoders = map[string]decoderFunc{
"application/json": decodeAsJSON,
"application/x-ndjson": decodeAsNdjson,
"text/csv": decodeAsCSV,
"application/zip": decodeAsZip,
"application/xml": decodeAsXML,
"text/xml; charset=utf-8": decodeAsXML,
}
// defaultDecoder is the decoder used when no registers
// decoder is available.
defaultDecoder = decodeAsJSON
)

log.Debugf("registering decoder 'application/xml': returned error: %#v",
registerDecoder("application/xml", decodeAsXML))
log.Debugf("registering decoder 'text/xml': returned error: %#v",
registerDecoder("text/xml; charset=utf-8", decodeAsXML))
}
type encoderFunc func(trReq transformable) ([]byte, error)
type decoderFunc func(p []byte, dst *response) error

// encodeAsJSON encodes trReq as a JSON message.
func encodeAsJSON(trReq transformable) ([]byte, error) {
if len(trReq.body()) == 0 {
return nil, nil
Expand All @@ -120,10 +70,12 @@ func encodeAsJSON(trReq transformable) ([]byte, error) {
return json.Marshal(trReq.body())
}

// decodeAsJSON decodes the JSON message in p into dst.
func decodeAsJSON(p []byte, dst *response) error {
return json.Unmarshal(p, &dst.body)
}

// encodeAsForm encodes trReq as a URL encoded form.
func encodeAsForm(trReq transformable) ([]byte, error) {
url := trReq.url()
body := []byte(url.RawQuery)
Expand All @@ -135,6 +87,8 @@ func encodeAsForm(trReq transformable) ([]byte, error) {
return body, nil
}

// decodeAsNdjson decodes the message in p as a JSON object stream
// It is more relaxed than NDJSON.
func decodeAsNdjson(p []byte, dst *response) error {
var results []interface{}
dec := json.NewDecoder(bytes.NewReader(p))
Expand All @@ -149,6 +103,7 @@ func decodeAsNdjson(p []byte, dst *response) error {
return nil
}

// decodeAsCSV decodes p as a headed CSV document into dst.
func decodeAsCSV(p []byte, dst *response) error {
var results []interface{}

Expand Down Expand Up @@ -189,6 +144,7 @@ func decodeAsCSV(p []byte, dst *response) error {
return nil
}

// decodeAsZip decodes p as a ZIP archive into dst.
func decodeAsZip(p []byte, dst *response) error {
var results []interface{}
r, err := zip.NewReader(bytes.NewReader(p), int64(len(p)))
Expand Down Expand Up @@ -225,6 +181,7 @@ func decodeAsZip(p []byte, dst *response) error {
return nil
}

// decodeAsXML decodes p as an XML document into dst.
func decodeAsXML(p []byte, dst *response) error {
cdata, body, err := xml.Unmarshal(bytes.NewReader(p), dst.xmlDetails)
if err != nil {
Expand Down
37 changes: 37 additions & 0 deletions x-pack/filebeat/input/httpjson/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,43 @@ func newNetHTTPClient(ctx context.Context, cfg *requestConfig, log *logp.Logger,
return netHTTPClient, nil
}

func newChainHTTPClient(ctx context.Context, authCfg *authConfig, requestCfg *requestConfig, log *logp.Logger, reg *monitoring.Registry, p ...*Policy) (*httpClient, error) {
// Make retryable HTTP client
netHTTPClient, err := newNetHTTPClient(ctx, requestCfg, log, reg)
if err != nil {
return nil, err
}

var retryPolicyFunc retryablehttp.CheckRetry
if len(p) != 0 {
retryPolicyFunc = p[0].CustomRetryPolicy
} else {
retryPolicyFunc = retryablehttp.DefaultRetryPolicy
}

client := &retryablehttp.Client{
HTTPClient: netHTTPClient,
Logger: newRetryLogger(log),
RetryWaitMin: requestCfg.Retry.getWaitMin(),
RetryWaitMax: requestCfg.Retry.getWaitMax(),
RetryMax: requestCfg.Retry.getMaxAttempts(),
CheckRetry: retryPolicyFunc,
Backoff: retryablehttp.DefaultBackoff,
}

limiter := newRateLimiterFromConfig(requestCfg.RateLimit, log)

if authCfg != nil && authCfg.OAuth2.isEnabled() {
authClient, err := authCfg.OAuth2.client(ctx, client.StandardClient())
if err != nil {
return nil, err
}
return &httpClient{client: authClient, limiter: limiter}, nil
}

return &httpClient{client: client.StandardClient(), limiter: limiter}, nil
}

// clientOption returns constructed client configuration options, including
// setting up http+unix and http+npipe transports if requested.
func clientOptions(u *url.URL, keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption {
Expand Down
5 changes: 0 additions & 5 deletions x-pack/filebeat/input/httpjson/input_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,11 +41,6 @@ func NewInputManager(log *logp.Logger, store inputcursor.StateStore) InputManage

// Init initializes both wrapped input managers.
func (m InputManager) Init(grp unison.Group, mode v2.Mode) error {
registerRequestTransforms()
registerResponseTransforms()
registerPaginationTransforms()
registerEncoders()
registerDecoders()
return multierr.Append(
m.stateless.Init(grp, mode),
m.cursor.Init(grp, mode),
Expand Down
Loading