Skip to content

Commit

Permalink
Live: test pipeline convert endpoint (grafana#39480)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Sep 30, 2021
1 parent 14d90b0 commit 29f3e17
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,7 @@ func (hs *HTTPServer) registerRoutes() {
// POST Live data to be processed according to channel rules.
liveRoute.Post("/push/:streamId/:path", hs.LivePushGateway.HandlePath)
liveRoute.Get("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesListHTTP), reqOrgAdmin)
liveRoute.Post("/pipeline-convert-test", routing.Wrap(hs.Live.HandlePipelineConvertTestHTTP), reqOrgAdmin)
liveRoute.Post("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesPostHTTP), reqOrgAdmin)
liveRoute.Put("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesPutHTTP), reqOrgAdmin)
liveRoute.Delete("/channel-rules", routing.Wrap(hs.Live.HandleChannelRulesDeleteHTTP), reqOrgAdmin)
Expand Down
79 changes: 79 additions & 0 deletions pkg/services/live/live.go
Original file line number Diff line number Diff line change
Expand Up @@ -885,6 +885,85 @@ func (g *GrafanaLive) HandleChannelRulesListHTTP(c *models.ReqContext) response.
})
}

type ConvertDryRunRequest struct {
ChannelRules []pipeline.ChannelRule `json:"channelRules"`
Channel string `json:"channel"`
Data string `json:"data"`
}

type ConvertDryRunResponse struct {
ChannelFrames []*pipeline.ChannelFrame `json:"channelFrames"`
}

type DryRunRuleStorage struct {
ChannelRules []pipeline.ChannelRule
}

func (s *DryRunRuleStorage) CreateChannelRule(_ context.Context, _ int64, _ pipeline.ChannelRule) (pipeline.ChannelRule, error) {
return pipeline.ChannelRule{}, errors.New("not implemented by dry run rule storage")
}

func (s *DryRunRuleStorage) UpdateChannelRule(_ context.Context, _ int64, _ pipeline.ChannelRule) (pipeline.ChannelRule, error) {
return pipeline.ChannelRule{}, errors.New("not implemented by dry run rule storage")
}

func (s *DryRunRuleStorage) DeleteChannelRule(_ context.Context, _ int64, _ string) error {
return errors.New("not implemented by dry run rule storage")
}

func (s *DryRunRuleStorage) ListRemoteWriteBackends(_ context.Context, _ int64) ([]pipeline.RemoteWriteBackend, error) {
return nil, nil
}

func (s *DryRunRuleStorage) ListChannelRules(_ context.Context, _ int64) ([]pipeline.ChannelRule, error) {
return s.ChannelRules, nil
}

// HandlePipelineConvertTestHTTP ...
func (g *GrafanaLive) HandlePipelineConvertTestHTTP(c *models.ReqContext) response.Response {
body, err := ioutil.ReadAll(c.Req.Body)
if err != nil {
return response.Error(http.StatusInternalServerError, "Error reading body", err)
}
var req ConvertDryRunRequest
err = json.Unmarshal(body, &req)
if err != nil {
return response.Error(http.StatusBadRequest, "Error decoding request", err)
}
storage := &DryRunRuleStorage{
ChannelRules: req.ChannelRules,
}
builder := &pipeline.StorageRuleBuilder{
Node: g.node,
ManagedStream: g.ManagedStreamRunner,
FrameStorage: pipeline.NewFrameStorage(),
RuleStorage: storage,
ChannelHandlerGetter: g,
}
channelRuleGetter := pipeline.NewCacheSegmentedTree(builder)
pipe, err := pipeline.New(channelRuleGetter)
if err != nil {
return response.Error(http.StatusInternalServerError, "Error creating pipeline", err)
}
rule, ok, err := channelRuleGetter.Get(c.OrgId, req.Channel)
if err != nil {
return response.Error(http.StatusInternalServerError, "Error getting channel rule", err)
}
if !ok {
return response.Error(http.StatusNotFound, "No rule found", nil)
}
channelFrames, ok, err := pipe.DataToChannelFrames(c.Req.Context(), *rule, c.OrgId, req.Channel, []byte(req.Data))
if err != nil {
return response.Error(http.StatusInternalServerError, "Error converting data", err)
}
if !ok {
return response.Error(http.StatusNotFound, "No converter found", nil)
}
return response.JSON(http.StatusOK, ConvertDryRunResponse{
ChannelFrames: channelFrames,
})
}

// HandleChannelRulesPostHTTP ...
func (g *GrafanaLive) HandleChannelRulesPostHTTP(c *models.ReqContext) response.Response {
body, err := ioutil.ReadAll(c.Req.Body)
Expand Down
9 changes: 4 additions & 5 deletions pkg/services/live/pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ import (
// then frame processing will be redirected to a corresponding channel rule.
// TODO: avoid recursion, increment a counter while frame travels over pipeline steps, make it configurable.
type ChannelFrame struct {
Channel string
Frame *data.Frame
Channel string `json:"channel"`
Frame *data.Frame `json:"frame"`
}

// Vars has some helpful things pipeline entities could use.
Expand Down Expand Up @@ -111,7 +111,6 @@ type Pipeline struct {

// New creates new Pipeline.
func New(ruleGetter ChannelRuleGetter) (*Pipeline, error) {
logger.Info("Live pipeline initialization")
p := &Pipeline{
ruleGetter: ruleGetter,
}
Expand All @@ -133,7 +132,7 @@ func (p *Pipeline) ProcessInput(ctx context.Context, orgID int64, channelID stri
if !ok {
return false, nil
}
channelFrames, ok, err := p.dataToChannelFrames(ctx, *rule, orgID, channelID, body)
channelFrames, ok, err := p.DataToChannelFrames(ctx, *rule, orgID, channelID, body)
if err != nil {
return false, err
}
Expand All @@ -147,7 +146,7 @@ func (p *Pipeline) ProcessInput(ctx context.Context, orgID int64, channelID stri
return true, nil
}

func (p *Pipeline) dataToChannelFrames(ctx context.Context, rule LiveChannelRule, orgID int64, channelID string, body []byte) ([]*ChannelFrame, bool, error) {
func (p *Pipeline) DataToChannelFrames(ctx context.Context, rule LiveChannelRule, orgID int64, channelID string, body []byte) ([]*ChannelFrame, bool, error) {
if rule.Converter == nil {
return nil, false, nil
}
Expand Down

0 comments on commit 29f3e17

Please sign in to comment.