Skip to content

Commit

Permalink
Refactor processors and move remaining models into "models". (elastic…
Browse files Browse the repository at this point in the history
…#1137) (elastic#1176)

This moves error, transaction, sourcemap and metric models into
directories in the "model" directory. It changes processors to
be simple structs which can potentially override Decode/Validate
(only sourcemaps do this). This removes a bunch of boilerplate
from the processors. This will help move towards decoding and
validating models individually, which is needed in the future.

And healthcheck processor is now a simple http.Handler.
  • Loading branch information
roncohen authored Jul 24, 2018
1 parent ffefc1f commit c7b7585
Show file tree
Hide file tree
Showing 55 changed files with 373 additions and 559 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ BEATS_VERSION?=6.x
NOW=$(shell date -u '+%Y-%m-%dT%H:%M:%S')
GOBUILD_FLAGS=-i -ldflags "-s -X $(BEAT_PATH)/vendor/github.com/elastic/beats/libbeat/version.buildTime=$(NOW) -X $(BEAT_PATH)/vendor/github.com/elastic/beats/libbeat/version.commit=$(COMMIT_ID)"
TESTIFY_TOOL_REPO?=github.com/elastic/beats/vendor/github.com/stretchr/testify/assert
FIELDS_FILE_PATH=processor
FIELDS_FILE_PATH=model
MAGE_IMPORT_PATH=${BEAT_PATH}/vendor/github.com/magefile/mage

# Path to the libbeat Makefile
Expand Down
60 changes: 29 additions & 31 deletions beater/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,10 @@ import (
"github.com/elastic/apm-server/decoder"
"github.com/elastic/apm-server/processor"
perr "github.com/elastic/apm-server/processor/error"
"github.com/elastic/apm-server/processor/healthcheck"
"github.com/elastic/apm-server/processor/metric"
"github.com/elastic/apm-server/processor/sourcemap"
"github.com/elastic/apm-server/processor/transaction"

"github.com/elastic/apm-server/utility"
"github.com/elastic/beats/libbeat/logp"
"github.com/elastic/beats/libbeat/monitoring"
Expand All @@ -62,13 +62,11 @@ const (
supportedMethods = "POST, OPTIONS"
)

type ProcessorFactory func() processor.Processor

type ProcessorHandler func(ProcessorFactory, *Config, reporter) http.Handler
type ProcessorHandler func(processor.Processor, *Config, reporter) http.Handler

type routeMapping struct {
ProcessorHandler
ProcessorFactory
processor.Processor
}

type serverResponse struct {
Expand Down Expand Up @@ -140,25 +138,27 @@ var (
}
}

Routes = map[string]routeMapping{
BackendTransactionsURL: {backendHandler, transaction.NewProcessor},
FrontendTransactionsURL: {frontendHandler, transaction.NewProcessor},
BackendErrorsURL: {backendHandler, perr.NewProcessor},
FrontendErrorsURL: {frontendHandler, perr.NewProcessor},
HealthCheckURL: {healthCheckHandler, healthcheck.NewProcessor},
MetricsURL: {metricsHandler, metric.NewProcessor},
SourcemapsURL: {sourcemapHandler, sourcemap.NewProcessor},
ProcessorRoutes = map[string]routeMapping{
BackendTransactionsURL: {backendHandler, transaction.Processor},
FrontendTransactionsURL: {frontendHandler, transaction.Processor},
BackendErrorsURL: {backendHandler, perr.Processor},
FrontendErrorsURL: {frontendHandler, perr.Processor},
MetricsURL: {metricsHandler, metric.Processor},
SourcemapsURL: {sourcemapHandler, sourcemap.Processor},
}
)

func newMuxer(beaterConfig *Config, report reporter) *http.ServeMux {
mux := http.NewServeMux()
logger := logp.NewLogger("handler")
for path, mapping := range Routes {
for path, mapping := range ProcessorRoutes {
logger.Infof("Path %s added to request handler", path)
mux.Handle(path, mapping.ProcessorHandler(mapping.ProcessorFactory, beaterConfig, report))

mux.Handle(path, mapping.ProcessorHandler(mapping.Processor, beaterConfig, report))
}

mux.Handle(HealthCheckURL, healthCheckHandler())

if beaterConfig.Expvar.isEnabled() {
path := beaterConfig.Expvar.Url
logger.Infof("Path %s added to request handler", path)
Expand Down Expand Up @@ -191,15 +191,15 @@ func concurrencyLimitHandler(beaterConfig *Config, h http.Handler) http.Handler
})
}

func backendHandler(pf ProcessorFactory, beaterConfig *Config, report reporter) http.Handler {
func backendHandler(p processor.Processor, beaterConfig *Config, report reporter) http.Handler {
return logHandler(
concurrencyLimitHandler(beaterConfig,
authHandler(beaterConfig.SecretToken,
processRequestHandler(pf, conf.Config{}, report,
processRequestHandler(p, conf.Config{}, report,
decoder.DecodeSystemData(decoder.DecodeLimitJSONData(beaterConfig.MaxUnzippedSize), beaterConfig.AugmentEnabled)))))
}

func frontendHandler(pf ProcessorFactory, beaterConfig *Config, report reporter) http.Handler {
func frontendHandler(p processor.Processor, beaterConfig *Config, report reporter) http.Handler {
smapper, err := beaterConfig.Frontend.memoizedSmapMapper()
if err != nil {
logp.NewLogger("handler").Error(err.Error())
Expand All @@ -214,30 +214,30 @@ func frontendHandler(pf ProcessorFactory, beaterConfig *Config, report reporter)
concurrencyLimitHandler(beaterConfig,
ipRateLimitHandler(beaterConfig.Frontend.RateLimit,
corsHandler(beaterConfig.Frontend.AllowOrigins,
processRequestHandler(pf, config, report,
processRequestHandler(p, config, report,
decoder.DecodeUserData(decoder.DecodeLimitJSONData(beaterConfig.MaxUnzippedSize), beaterConfig.AugmentEnabled)))))))
}

func metricsHandler(pf ProcessorFactory, beaterConfig *Config, report reporter) http.Handler {
func metricsHandler(p processor.Processor, beaterConfig *Config, report reporter) http.Handler {
return logHandler(
killSwitchHandler(beaterConfig.Metrics.isEnabled(),
authHandler(beaterConfig.SecretToken,
processRequestHandler(pf, conf.Config{}, report,
processRequestHandler(p, conf.Config{}, report,
decoder.DecodeSystemData(decoder.DecodeLimitJSONData(beaterConfig.MaxUnzippedSize), beaterConfig.AugmentEnabled)))))
}

func sourcemapHandler(pf ProcessorFactory, beaterConfig *Config, report reporter) http.Handler {
func sourcemapHandler(p processor.Processor, beaterConfig *Config, report reporter) http.Handler {
smapper, err := beaterConfig.Frontend.memoizedSmapMapper()
if err != nil {
logp.NewLogger("handler").Error(err.Error())
}
return logHandler(
killSwitchHandler(beaterConfig.Frontend.isEnabled(),
authHandler(beaterConfig.SecretToken,
processRequestHandler(pf, conf.Config{SmapMapper: smapper}, report, decoder.DecodeSourcemapFormData))))
processRequestHandler(p, conf.Config{SmapMapper: smapper}, report, decoder.DecodeSourcemapFormData))))
}

func healthCheckHandler(_ ProcessorFactory, _ *Config, _ reporter) http.Handler {
func healthCheckHandler() http.Handler {
return logHandler(
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
sendStatus(w, r, okResponse)
Expand Down Expand Up @@ -383,16 +383,14 @@ func corsHandler(allowedOrigins []string, h http.Handler) http.Handler {
})
}

func processRequestHandler(pf ProcessorFactory, config conf.Config, report reporter, decode decoder.Decoder) http.Handler {
func processRequestHandler(p processor.Processor, config conf.Config, report reporter, decode decoder.Decoder) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
res := processRequest(r, pf, config, report, decode)
res := processRequest(r, p, config, report, decode)
sendStatus(w, r, res)
})
}

func processRequest(r *http.Request, pf ProcessorFactory, config conf.Config, report reporter, decode decoder.Decoder) serverResponse {
processor := pf()

func processRequest(r *http.Request, p processor.Processor, config conf.Config, report reporter, decode decoder.Decoder) serverResponse {
if r.Method != "POST" {
return methodNotAllowedResponse
}
Expand All @@ -406,11 +404,11 @@ func processRequest(r *http.Request, pf ProcessorFactory, config conf.Config, re

}

if err = processor.Validate(data); err != nil {
if err = p.Validate(data); err != nil {
return cannotValidateResponse(err)
}

payload, err := processor.Decode(data)
payload, err := p.Decode(data)
if err != nil {
return cannotDecodeResponse(err)
}
Expand Down
4 changes: 2 additions & 2 deletions beater/pub.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (

"github.com/elastic/apm-agent-go"
"github.com/elastic/apm-server/config"
pr "github.com/elastic/apm-server/processor"
"github.com/elastic/apm-server/model"
"github.com/elastic/beats/libbeat/beat"
)

Expand All @@ -47,7 +47,7 @@ type publisher struct {
}

type pendingReq struct {
payload pr.Payload
payload model.Payload
config config.Config
trace bool
}
Expand Down
File renamed without changes.
20 changes: 20 additions & 0 deletions processor/error/event.go → model/error/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,12 +28,32 @@ import (
"strconv"
"time"

"github.com/santhosh-tekuri/jsonschema"

"github.com/elastic/apm-server/config"
m "github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/model/error/generated/schema"
"github.com/elastic/apm-server/utility"
"github.com/elastic/apm-server/validation"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/monitoring"
)

var (
Metrics = monitoring.Default.NewRegistry("apm-server.processor.error", monitoring.PublishExpvar)
)

const (
processorName = "error"
errorDocType = "error"
)

var cachedSchema = validation.CreateSchema(schema.PayloadSchema, processorName)

func PayloadSchema() *jsonschema.Schema {
return cachedSchema
}

type Event struct {
Id *string
Culprit *string
Expand Down
File renamed without changes.
File renamed without changes.
10 changes: 5 additions & 5 deletions processor/error/payload.go → model/error/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,10 @@ import (
)

var (
transformations = monitoring.NewInt(errorMetrics, "transformations")
errorCounter = monitoring.NewInt(errorMetrics, "errors")
stacktraceCounter = monitoring.NewInt(errorMetrics, "stacktraces")
frameCounter = monitoring.NewInt(errorMetrics, "frames")
transformations = monitoring.NewInt(Metrics, "transformations")
errorCounter = monitoring.NewInt(Metrics, "errors")
stacktraceCounter = monitoring.NewInt(Metrics, "stacktraces")
frameCounter = monitoring.NewInt(Metrics, "frames")
processorEntry = common.MapStr{"name": processorName, "event": errorDocType}
)

Expand All @@ -43,7 +43,7 @@ type Payload struct {
Events []*Event
}

func DecodePayload(raw map[string]interface{}) (*Payload, error) {
func DecodePayload(raw map[string]interface{}) (m.Payload, error) {
if raw == nil {
return nil, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestPayloadDecode(t *testing.T) {
for _, test := range []struct {
input map[string]interface{}
err error
p *Payload
p m.Payload
}{
{input: nil, err: nil, p: nil},
{
Expand Down
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
23 changes: 21 additions & 2 deletions processor/metric/payload.go → model/metric/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,12 @@ import (
"fmt"
"time"

"github.com/santhosh-tekuri/jsonschema"

"github.com/elastic/apm-server/model"
"github.com/elastic/apm-server/model/metric/generated/schema"
"github.com/elastic/apm-server/validation"

"github.com/pkg/errors"

"github.com/elastic/apm-server/config"
Expand All @@ -32,11 +38,24 @@ import (
"github.com/elastic/beats/libbeat/monitoring"
)

const (
processorName = "metric"
docType = "metric"
)

var (
transformations = monitoring.NewInt(metricMetrics, "transformations")
transformations = monitoring.NewInt(Metrics, "transformations")
processorEntry = common.MapStr{"name": processorName, "event": docType}

Metrics = monitoring.Default.NewRegistry("apm-server.processor.metric", monitoring.PublishExpvar)
)

var cachedSchema = validation.CreateSchema(schema.PayloadSchema, processorName)

func PayloadSchema() *jsonschema.Schema {
return cachedSchema
}

type sample interface {
transform(common.MapStr) error
}
Expand Down Expand Up @@ -90,7 +109,7 @@ type metricDecoder struct {
*utility.ManualDecoder
}

func DecodePayload(raw map[string]interface{}) (*Payload, error) {
func DecodePayload(raw map[string]interface{}) (model.Payload, error) {
if raw == nil {
return nil, nil
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/apm-server/config"
m "github.com/elastic/apm-server/model"
Expand Down Expand Up @@ -248,7 +249,12 @@ func TestPayloadDecode(t *testing.T) {
},
},
} {
payload, err := DecodePayload(test.input)
payloadInterface, err := DecodePayload(test.input)

payload, ok := payloadInterface.(*Payload)
if payloadInterface != nil {
require.True(t, ok)
}

// compare metrics separately as they may be ordered differently
if test.p != nil && test.p.Metrics != nil {
Expand Down
File renamed without changes.
17 changes: 5 additions & 12 deletions processor/error/processor_test.go → model/payload.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,13 @@
// specific language governing permissions and limitations
// under the License.

package error
package model

import (
"testing"

"github.com/stretchr/testify/assert"

pr "github.com/elastic/apm-server/processor"
"github.com/elastic/apm-server/config"
"github.com/elastic/beats/libbeat/beat"
)

func TestImplementProcessorInterface(t *testing.T) {
p := NewProcessor()
assert.NotNil(t, p)
_, ok := p.(pr.Processor)
assert.True(t, ok)
assert.IsType(t, &processor{}, p)
type Payload interface {
Transform(config.Config) []beat.Event
}
File renamed without changes.
File renamed without changes.
Loading

0 comments on commit c7b7585

Please sign in to comment.