From b528ab5ea092f06f165c02125de2b7508f43b531 Mon Sep 17 00:00:00 2001 From: Marcus Efraimsson Date: Fri, 5 Jul 2024 16:21:51 +0200 Subject: [PATCH] Handler middlewares --- backend/handler.go | 70 ++++++++ backend/handler_middleware.go | 158 +++++++++++++++++ backend/handler_middleware_test.go | 234 +++++++++++++++++++++++++ backend/handlertest/handlertest.go | 271 +++++++++++++++++++++++++++++ backend/stream.go | 78 +++++++-- 5 files changed, 795 insertions(+), 16 deletions(-) create mode 100644 backend/handler.go create mode 100644 backend/handler_middleware.go create mode 100644 backend/handler_middleware_test.go create mode 100644 backend/handlertest/handlertest.go diff --git a/backend/handler.go b/backend/handler.go new file mode 100644 index 000000000..d79b3f630 --- /dev/null +++ b/backend/handler.go @@ -0,0 +1,70 @@ +package backend + +import "context" + +// Handler interface for all handlers. +type Handler interface { + QueryDataHandler + CheckHealthHandler + CallResourceHandler + CollectMetricsHandler + StreamHandler + AdmissionHandler + ConversionHandler +} + +var _ = Handler(&BaseHandler{}) + +// BaseHandler base handler provides a base implementation of Handler interface +// passing the request down the chain to next Handler. +// This allows handlers to avoid implementing the full Handler interface. +type BaseHandler struct { + next Handler +} + +// NewBaseHandler creates a new BaseHandler. +func NewBaseHandler(next Handler) BaseHandler { + return BaseHandler{ + next: next, + } +} + +func (m BaseHandler) QueryData(ctx context.Context, req *QueryDataRequest) (*QueryDataResponse, error) { + return m.next.QueryData(ctx, req) +} + +func (m BaseHandler) CallResource(ctx context.Context, req *CallResourceRequest, sender CallResourceResponseSender) error { + return m.next.CallResource(ctx, req, sender) +} + +func (m BaseHandler) CheckHealth(ctx context.Context, req *CheckHealthRequest) (*CheckHealthResult, error) { + return m.next.CheckHealth(ctx, req) +} + +func (m BaseHandler) CollectMetrics(ctx context.Context, req *CollectMetricsRequest) (*CollectMetricsResult, error) { + return m.next.CollectMetrics(ctx, req) +} + +func (m BaseHandler) SubscribeStream(ctx context.Context, req *SubscribeStreamRequest) (*SubscribeStreamResponse, error) { + return m.next.SubscribeStream(ctx, req) +} + +func (m BaseHandler) PublishStream(ctx context.Context, req *PublishStreamRequest) (*PublishStreamResponse, error) { + return m.next.PublishStream(ctx, req) +} + +func (m BaseHandler) RunStream(ctx context.Context, req *RunStreamRequest, sender *StreamSender) error { + return m.next.RunStream(ctx, req, sender) +} + +func (m BaseHandler) ValidateAdmission(ctx context.Context, req *AdmissionRequest) (*ValidationResponse, error) { + return m.next.ValidateAdmission(ctx, req) +} + +func (m *BaseHandler) MutateAdmission(ctx context.Context, req *AdmissionRequest) (*MutationResponse, error) { + return m.next.MutateAdmission(ctx, req) +} + +func (m *BaseHandler) ConvertObjects(ctx context.Context, req *ConversionRequest) (*ConversionResponse, error) { + return m.next.ConvertObjects(ctx, req) +} diff --git a/backend/handler_middleware.go b/backend/handler_middleware.go new file mode 100644 index 000000000..9cc597230 --- /dev/null +++ b/backend/handler_middleware.go @@ -0,0 +1,158 @@ +package backend + +import ( + "context" + "errors" +) + +var ( + errNilRequest = errors.New("req cannot be nil") + errNilSender = errors.New("sender cannot be nil") +) + +// HandlerMiddleware is an interface representing the ability to create a middleware +// that implements the Handler interface. +type HandlerMiddleware interface { + // CreateHandlerMiddleware creates a new Handler by decorating next Handler. + CreateHandlerMiddleware(next Handler) Handler +} + +// The HandlerMiddlewareFunc type is an adapter to allow the use of ordinary +// functions as HandlerMiddleware's. If f is a function with the appropriate +// signature, HandlerMiddlewareFunc(f) is a HandlerMiddleware that calls f. +type HandlerMiddlewareFunc func(next Handler) Handler + +// CreateHandlerMiddleware implements the HandlerMiddleware interface. +func (fn HandlerMiddlewareFunc) CreateHandlerMiddleware(next Handler) Handler { + return fn(next) +} + +// MiddlewareHandler decorates a Handler with HandlerMiddleware's. +type MiddlewareHandler struct { + handler Handler +} + +// HandlerFromMiddlewares creates a new MiddlewareHandler implementing Handler that decorates finalHandler with middlewares. +func HandlerFromMiddlewares(finalHandler Handler, middlewares ...HandlerMiddleware) (*MiddlewareHandler, error) { + if finalHandler == nil { + return nil, errors.New("finalHandler cannot be nil") + } + + return &MiddlewareHandler{ + handler: handlerFromMiddlewares(middlewares, finalHandler), + }, nil +} + +func (h *MiddlewareHandler) QueryData(ctx context.Context, req *QueryDataRequest) (*QueryDataResponse, error) { + if req == nil { + return nil, errNilRequest + } + + return h.handler.QueryData(ctx, req) +} + +func (h MiddlewareHandler) CallResource(ctx context.Context, req *CallResourceRequest, sender CallResourceResponseSender) error { + if req == nil { + return errNilRequest + } + + if sender == nil { + return errNilSender + } + + return h.handler.CallResource(ctx, req, sender) +} + +func (h MiddlewareHandler) CollectMetrics(ctx context.Context, req *CollectMetricsRequest) (*CollectMetricsResult, error) { + if req == nil { + return nil, errNilRequest + } + + return h.handler.CollectMetrics(ctx, req) +} + +func (h MiddlewareHandler) CheckHealth(ctx context.Context, req *CheckHealthRequest) (*CheckHealthResult, error) { + if req == nil { + return nil, errNilRequest + } + + return h.handler.CheckHealth(ctx, req) +} + +func (h MiddlewareHandler) SubscribeStream(ctx context.Context, req *SubscribeStreamRequest) (*SubscribeStreamResponse, error) { + if req == nil { + return nil, errNilRequest + } + + return h.handler.SubscribeStream(ctx, req) +} + +func (h MiddlewareHandler) PublishStream(ctx context.Context, req *PublishStreamRequest) (*PublishStreamResponse, error) { + if req == nil { + return nil, errNilRequest + } + + return h.handler.PublishStream(ctx, req) +} + +func (h MiddlewareHandler) RunStream(ctx context.Context, req *RunStreamRequest, sender *StreamSender) error { + if req == nil { + return errNilRequest + } + + if sender == nil { + return errors.New("sender cannot be nil") + } + + return h.handler.RunStream(ctx, req, sender) +} + +func (h MiddlewareHandler) ValidateAdmission(ctx context.Context, req *AdmissionRequest) (*ValidationResponse, error) { + if req == nil { + return nil, errNilRequest + } + + return h.handler.ValidateAdmission(ctx, req) +} + +func (h MiddlewareHandler) MutateAdmission(ctx context.Context, req *AdmissionRequest) (*MutationResponse, error) { + if req == nil { + return nil, errNilRequest + } + + return h.handler.MutateAdmission(ctx, req) +} + +func (h MiddlewareHandler) ConvertObjects(ctx context.Context, req *ConversionRequest) (*ConversionResponse, error) { + if req == nil { + return nil, errNilRequest + } + + return h.handler.ConvertObjects(ctx, req) +} + +func handlerFromMiddlewares(middlewares []HandlerMiddleware, finalHandler Handler) Handler { + if len(middlewares) == 0 { + return finalHandler + } + + reversed := reverseMiddlewares(middlewares) + next := finalHandler + + for _, m := range reversed { + next = m.CreateHandlerMiddleware(next) + } + + return next +} + +func reverseMiddlewares(middlewares []HandlerMiddleware) []HandlerMiddleware { + reversed := make([]HandlerMiddleware, len(middlewares)) + copy(reversed, middlewares) + + for i, j := 0, len(reversed)-1; i < j; i, j = i+1, j-1 { + reversed[i], reversed[j] = reversed[j], reversed[i] + } + + return reversed +} diff --git a/backend/handler_middleware_test.go b/backend/handler_middleware_test.go new file mode 100644 index 000000000..7320e16df --- /dev/null +++ b/backend/handler_middleware_test.go @@ -0,0 +1,234 @@ +package backend_test + +import ( + "context" + "fmt" + "testing" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/handlertest" + "github.com/stretchr/testify/require" +) + +func TestHandlerFromMiddlewares(t *testing.T) { + var queryDataCalled bool + var callResourceCalled bool + var checkHealthCalled bool + var collectMetricsCalled bool + var subscribeStreamCalled bool + var publishStreamCalled bool + var runStreamCalled bool + var mutateAdmissionCalled bool + var validateAdmissionCalled bool + var convertObjectCalled bool + + c := &handlertest.Handler{ + QueryDataFunc: func(_ context.Context, _ *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + queryDataCalled = true + return nil, nil + }, + CallResourceFunc: func(_ context.Context, _ *backend.CallResourceRequest, _ backend.CallResourceResponseSender) error { + callResourceCalled = true + return nil + }, + CheckHealthFunc: func(_ context.Context, _ *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + checkHealthCalled = true + return nil, nil + }, + CollectMetricsFunc: func(_ context.Context, _ *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { + collectMetricsCalled = true + return nil, nil + }, + SubscribeStreamFunc: func(_ context.Context, _ *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + subscribeStreamCalled = true + return nil, nil + }, + PublishStreamFunc: func(_ context.Context, _ *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + publishStreamCalled = true + return nil, nil + }, + RunStreamFunc: func(_ context.Context, _ *backend.RunStreamRequest, _ *backend.StreamSender) error { + runStreamCalled = true + return nil + }, + MutateAdmissionFunc: func(_ context.Context, _ *backend.AdmissionRequest) (*backend.MutationResponse, error) { + mutateAdmissionCalled = true + return nil, nil + }, + ValidateAdmissionFunc: func(_ context.Context, _ *backend.AdmissionRequest) (*backend.ValidationResponse, error) { + validateAdmissionCalled = true + return nil, nil + }, + ConvertObjectsFunc: func(_ context.Context, _ *backend.ConversionRequest) (*backend.ConversionResponse, error) { + convertObjectCalled = true + return nil, nil + }, + } + + require.NotNil(t, c) + + ctx := MiddlewareScenarioContext{} + + mwOne := ctx.NewMiddleware("mw1") + mwTwo := ctx.NewMiddleware("mw2") + + d, err := backend.HandlerFromMiddlewares(c, mwOne, mwTwo) + require.NoError(t, err) + require.NotNil(t, d) + + _, _ = d.QueryData(context.Background(), &backend.QueryDataRequest{}) + require.True(t, queryDataCalled) + + sender := backend.CallResourceResponseSenderFunc(func(_ *backend.CallResourceResponse) error { + return nil + }) + + _ = d.CallResource(context.Background(), &backend.CallResourceRequest{}, sender) + require.True(t, callResourceCalled) + + _, _ = d.CheckHealth(context.Background(), &backend.CheckHealthRequest{}) + require.True(t, checkHealthCalled) + + _, _ = d.CollectMetrics(context.Background(), &backend.CollectMetricsRequest{}) + require.True(t, collectMetricsCalled) + + _, _ = d.SubscribeStream(context.Background(), &backend.SubscribeStreamRequest{}) + require.True(t, subscribeStreamCalled) + + _, _ = d.PublishStream(context.Background(), &backend.PublishStreamRequest{}) + require.True(t, publishStreamCalled) + + streamSender := backend.NewStreamSender(nil) + _ = d.RunStream(context.Background(), &backend.RunStreamRequest{}, streamSender) + require.True(t, runStreamCalled) + + _, _ = d.MutateAdmission(context.Background(), &backend.AdmissionRequest{}) + require.True(t, mutateAdmissionCalled) + + _, _ = d.ValidateAdmission(context.Background(), &backend.AdmissionRequest{}) + require.True(t, validateAdmissionCalled) + + _, _ = d.ConvertObjects(context.Background(), &backend.ConversionRequest{}) + require.True(t, convertObjectCalled) + + require.Len(t, ctx.QueryDataCallChain, 4) + require.EqualValues(t, []string{"before mw1", "before mw2", "after mw2", "after mw1"}, ctx.QueryDataCallChain) + require.Len(t, ctx.CallResourceCallChain, 4) + require.EqualValues(t, []string{"before mw1", "before mw2", "after mw2", "after mw1"}, ctx.CallResourceCallChain) + require.Len(t, ctx.CheckHealthCallChain, 4) + require.EqualValues(t, []string{"before mw1", "before mw2", "after mw2", "after mw1"}, ctx.CheckHealthCallChain) + require.Len(t, ctx.CollectMetricsCallChain, 4) + require.EqualValues(t, []string{"before mw1", "before mw2", "after mw2", "after mw1"}, ctx.CollectMetricsCallChain) + require.Len(t, ctx.SubscribeStreamCallChain, 4) + require.EqualValues(t, []string{"before mw1", "before mw2", "after mw2", "after mw1"}, ctx.SubscribeStreamCallChain) + require.Len(t, ctx.PublishStreamCallChain, 4) + require.EqualValues(t, []string{"before mw1", "before mw2", "after mw2", "after mw1"}, ctx.PublishStreamCallChain) + require.Len(t, ctx.RunStreamCallChain, 4) + require.EqualValues(t, []string{"before mw1", "before mw2", "after mw2", "after mw1"}, ctx.RunStreamCallChain) + require.Len(t, ctx.MutateAdmissionCallChain, 4) + require.EqualValues(t, []string{"before mw1", "before mw2", "after mw2", "after mw1"}, ctx.MutateAdmissionCallChain) + require.Len(t, ctx.ValidateAdmissionCallChain, 4) + require.EqualValues(t, []string{"before mw1", "before mw2", "after mw2", "after mw1"}, ctx.ValidateAdmissionCallChain) + require.Len(t, ctx.ConvertObjectCallChain, 4) + require.EqualValues(t, []string{"before mw1", "before mw2", "after mw2", "after mw1"}, ctx.ConvertObjectCallChain) +} + +type MiddlewareScenarioContext struct { + QueryDataCallChain []string + CallResourceCallChain []string + CollectMetricsCallChain []string + CheckHealthCallChain []string + SubscribeStreamCallChain []string + PublishStreamCallChain []string + RunStreamCallChain []string + InstanceSettingsCallChain []string + ValidateAdmissionCallChain []string + MutateAdmissionCallChain []string + ConvertObjectCallChain []string +} + +func (ctx *MiddlewareScenarioContext) NewMiddleware(name string) backend.HandlerMiddleware { + return backend.HandlerMiddlewareFunc(func(next backend.Handler) backend.Handler { + return &TestMiddleware{ + next: next, + Name: name, + sCtx: ctx, + } + }) +} + +type TestMiddleware struct { + next backend.Handler + sCtx *MiddlewareScenarioContext + Name string +} + +func (m *TestMiddleware) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + m.sCtx.QueryDataCallChain = append(m.sCtx.QueryDataCallChain, fmt.Sprintf("before %s", m.Name)) + res, err := m.next.QueryData(ctx, req) + m.sCtx.QueryDataCallChain = append(m.sCtx.QueryDataCallChain, fmt.Sprintf("after %s", m.Name)) + return res, err +} + +func (m *TestMiddleware) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + m.sCtx.CallResourceCallChain = append(m.sCtx.CallResourceCallChain, fmt.Sprintf("before %s", m.Name)) + err := m.next.CallResource(ctx, req, sender) + m.sCtx.CallResourceCallChain = append(m.sCtx.CallResourceCallChain, fmt.Sprintf("after %s", m.Name)) + return err +} + +func (m *TestMiddleware) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { + m.sCtx.CollectMetricsCallChain = append(m.sCtx.CollectMetricsCallChain, fmt.Sprintf("before %s", m.Name)) + res, err := m.next.CollectMetrics(ctx, req) + m.sCtx.CollectMetricsCallChain = append(m.sCtx.CollectMetricsCallChain, fmt.Sprintf("after %s", m.Name)) + return res, err +} + +func (m *TestMiddleware) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + m.sCtx.CheckHealthCallChain = append(m.sCtx.CheckHealthCallChain, fmt.Sprintf("before %s", m.Name)) + res, err := m.next.CheckHealth(ctx, req) + m.sCtx.CheckHealthCallChain = append(m.sCtx.CheckHealthCallChain, fmt.Sprintf("after %s", m.Name)) + return res, err +} + +func (m *TestMiddleware) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + m.sCtx.SubscribeStreamCallChain = append(m.sCtx.SubscribeStreamCallChain, fmt.Sprintf("before %s", m.Name)) + res, err := m.next.SubscribeStream(ctx, req) + m.sCtx.SubscribeStreamCallChain = append(m.sCtx.SubscribeStreamCallChain, fmt.Sprintf("after %s", m.Name)) + return res, err +} + +func (m *TestMiddleware) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + m.sCtx.PublishStreamCallChain = append(m.sCtx.PublishStreamCallChain, fmt.Sprintf("before %s", m.Name)) + res, err := m.next.PublishStream(ctx, req) + m.sCtx.PublishStreamCallChain = append(m.sCtx.PublishStreamCallChain, fmt.Sprintf("after %s", m.Name)) + return res, err +} + +func (m *TestMiddleware) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { + m.sCtx.RunStreamCallChain = append(m.sCtx.RunStreamCallChain, fmt.Sprintf("before %s", m.Name)) + err := m.next.RunStream(ctx, req, sender) + m.sCtx.RunStreamCallChain = append(m.sCtx.RunStreamCallChain, fmt.Sprintf("after %s", m.Name)) + return err +} + +func (m *TestMiddleware) ValidateAdmission(ctx context.Context, req *backend.AdmissionRequest) (*backend.ValidationResponse, error) { + m.sCtx.ValidateAdmissionCallChain = append(m.sCtx.ValidateAdmissionCallChain, fmt.Sprintf("before %s", m.Name)) + res, err := m.next.ValidateAdmission(ctx, req) + m.sCtx.ValidateAdmissionCallChain = append(m.sCtx.ValidateAdmissionCallChain, fmt.Sprintf("after %s", m.Name)) + return res, err +} + +func (m *TestMiddleware) MutateAdmission(ctx context.Context, req *backend.AdmissionRequest) (*backend.MutationResponse, error) { + m.sCtx.MutateAdmissionCallChain = append(m.sCtx.MutateAdmissionCallChain, fmt.Sprintf("before %s", m.Name)) + res, err := m.next.MutateAdmission(ctx, req) + m.sCtx.MutateAdmissionCallChain = append(m.sCtx.MutateAdmissionCallChain, fmt.Sprintf("after %s", m.Name)) + return res, err +} + +func (m *TestMiddleware) ConvertObjects(ctx context.Context, req *backend.ConversionRequest) (*backend.ConversionResponse, error) { + m.sCtx.ConvertObjectCallChain = append(m.sCtx.ConvertObjectCallChain, fmt.Sprintf("before %s", m.Name)) + res, err := m.next.ConvertObjects(ctx, req) + m.sCtx.ConvertObjectCallChain = append(m.sCtx.ConvertObjectCallChain, fmt.Sprintf("after %s", m.Name)) + return res, err +} diff --git a/backend/handlertest/handlertest.go b/backend/handlertest/handlertest.go new file mode 100644 index 000000000..ecf7f936e --- /dev/null +++ b/backend/handlertest/handlertest.go @@ -0,0 +1,271 @@ +package handlertest + +import ( + "context" + "testing" + + "github.com/grafana/grafana-plugin-sdk-go/backend" +) + +var _ backend.Handler = &Handler{} + +// Handler a test handler implementing backend.Handler. +type Handler struct { + QueryDataFunc backend.QueryDataHandlerFunc + CallResourceFunc backend.CallResourceHandlerFunc + CheckHealthFunc backend.CheckHealthHandlerFunc + CollectMetricsFunc backend.CollectMetricsHandlerFunc + SubscribeStreamFunc backend.SubscribeStreamHandlerFunc + PublishStreamFunc backend.PublishStreamHandlerFunc + RunStreamFunc backend.RunStreamHandlerFunc + MutateAdmissionFunc backend.MutateAdmissionFunc + ValidateAdmissionFunc backend.ValidateAdmissionFunc + ConvertObjectsFunc backend.ConvertObjectsFunc +} + +func (h Handler) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + if h.QueryDataFunc != nil { + return h.QueryDataFunc(ctx, req) + } + + return nil, nil +} + +func (h Handler) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + if h.CallResourceFunc != nil { + return h.CallResourceFunc(ctx, req, sender) + } + + return nil +} + +func (h Handler) CheckHealth(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + if h.CheckHealthFunc != nil { + return h.CheckHealthFunc(ctx, req) + } + + return nil, nil +} + +func (h Handler) CollectMetrics(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { + if h.CollectMetricsFunc != nil { + return h.CollectMetricsFunc(ctx, req) + } + + return nil, nil +} + +func (h Handler) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + if h.SubscribeStreamFunc != nil { + return h.SubscribeStreamFunc(ctx, req) + } + + return nil, nil +} + +func (h Handler) PublishStream(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + if h.PublishStreamFunc != nil { + return h.PublishStreamFunc(ctx, req) + } + + return nil, nil +} + +func (h Handler) RunStream(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { + if h.RunStreamFunc != nil { + return h.RunStreamFunc(ctx, req, sender) + } + + return nil +} + +func (h Handler) ValidateAdmission(ctx context.Context, req *backend.AdmissionRequest) (*backend.ValidationResponse, error) { + if h.ValidateAdmissionFunc != nil { + return h.ValidateAdmissionFunc(ctx, req) + } + + return nil, nil +} + +func (h Handler) MutateAdmission(ctx context.Context, req *backend.AdmissionRequest) (*backend.MutationResponse, error) { + if h.MutateAdmissionFunc != nil { + return h.MutateAdmissionFunc(ctx, req) + } + + return nil, nil +} + +func (h Handler) ConvertObjects(ctx context.Context, req *backend.ConversionRequest) (*backend.ConversionResponse, error) { + if h.ConvertObjectsFunc != nil { + return h.ConvertObjectsFunc(ctx, req) + } + + return nil, nil +} + +type HandlerMiddlewareTest struct { + T *testing.T + TestHandler *Handler + Middlewares []backend.HandlerMiddleware + MiddlewareHandler *backend.MiddlewareHandler + QueryDataReq *backend.QueryDataRequest + QueryDataCtx context.Context + CallResourceReq *backend.CallResourceRequest + CallResourceCtx context.Context + CheckHealthReq *backend.CheckHealthRequest + CheckHealthCtx context.Context + CollectMetricsReq *backend.CollectMetricsRequest + CollectMetricsCtx context.Context + SubscribeStreamReq *backend.SubscribeStreamRequest + SubscribeStreamCtx context.Context + PublishStreamReq *backend.PublishStreamRequest + PublishStreamCtx context.Context + RunStreamReq *backend.RunStreamRequest + RunStreamCtx context.Context + MutateAdmissionReq *backend.AdmissionRequest + MutateAdmissionCtx context.Context + ValidationAdmissionReq *backend.AdmissionRequest + ValidateAdmissionCtx context.Context + ConvertObjectReq *backend.ConversionRequest + ConvertObjectCtx context.Context + + // When CallResource is called, the sender will be called with these values + callResourceResponses []*backend.CallResourceResponse + runStreamResponseBytes [][]byte + runStreamResponseJSONBytes [][]byte +} + +type HandlerMiddlewareTestOption func(*HandlerMiddlewareTest) + +func NewHandlerMiddlewareTest(t *testing.T, opts ...HandlerMiddlewareTestOption) *HandlerMiddlewareTest { + t.Helper() + + cdt := &HandlerMiddlewareTest{ + T: t, + } + cdt.TestHandler = &Handler{ + QueryDataFunc: func(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) { + cdt.QueryDataReq = req + cdt.QueryDataCtx = ctx + return nil, nil + }, + CallResourceFunc: func(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error { + cdt.CallResourceReq = req + cdt.CallResourceCtx = ctx + if cdt.callResourceResponses != nil { + for _, r := range cdt.callResourceResponses { + if err := sender.Send(r); err != nil { + return err + } + } + } + return nil + }, + CheckHealthFunc: func(ctx context.Context, req *backend.CheckHealthRequest) (*backend.CheckHealthResult, error) { + cdt.CheckHealthReq = req + cdt.CheckHealthCtx = ctx + return nil, nil + }, + CollectMetricsFunc: func(ctx context.Context, req *backend.CollectMetricsRequest) (*backend.CollectMetricsResult, error) { + cdt.CollectMetricsReq = req + cdt.CollectMetricsCtx = ctx + return nil, nil + }, + SubscribeStreamFunc: func(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + cdt.SubscribeStreamReq = req + cdt.SubscribeStreamCtx = ctx + return nil, nil + }, + PublishStreamFunc: func(ctx context.Context, req *backend.PublishStreamRequest) (*backend.PublishStreamResponse, error) { + cdt.PublishStreamReq = req + cdt.PublishStreamCtx = ctx + return nil, nil + }, + RunStreamFunc: func(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender) error { + cdt.RunStreamReq = req + cdt.RunStreamCtx = ctx + + if cdt.runStreamResponseBytes != nil { + for _, b := range cdt.runStreamResponseBytes { + if err := sender.SendBytes(b); err != nil { + return err + } + } + } + + if cdt.runStreamResponseJSONBytes != nil { + for _, b := range cdt.runStreamResponseJSONBytes { + if err := sender.SendJSON(b); err != nil { + return err + } + } + } + + return nil + }, + ValidateAdmissionFunc: func(ctx context.Context, ar *backend.AdmissionRequest) (*backend.ValidationResponse, error) { + cdt.ValidationAdmissionReq = ar + cdt.ValidateAdmissionCtx = ctx + return nil, nil + }, + MutateAdmissionFunc: func(ctx context.Context, ar *backend.AdmissionRequest) (*backend.MutationResponse, error) { + cdt.MutateAdmissionReq = ar + cdt.MutateAdmissionCtx = ctx + return nil, nil + }, + ConvertObjectsFunc: func(ctx context.Context, cr *backend.ConversionRequest) (*backend.ConversionResponse, error) { + cdt.ConvertObjectReq = cr + cdt.ConvertObjectCtx = ctx + return nil, nil + }, + } + + for _, opt := range opts { + opt(cdt) + } + + mwHandler, err := backend.HandlerFromMiddlewares(cdt.TestHandler, cdt.Middlewares...) + if err != nil { + t.Fatalf("failed to create handler from middlewares: %s", err.Error()) + } + + if mwHandler == nil { + t.Fatal("create handler from middlewares not expected to be nil") + } + + cdt.MiddlewareHandler = mwHandler + + return cdt +} + +// WithMiddlewares HandlerMiddlewareTestOption option to append middlewares to HandlerMiddlewareTest. +func WithMiddlewares(middlewares ...backend.HandlerMiddleware) HandlerMiddlewareTestOption { + return HandlerMiddlewareTestOption(func(cdt *HandlerMiddlewareTest) { + if cdt.Middlewares == nil { + cdt.Middlewares = []backend.HandlerMiddleware{} + } + + cdt.Middlewares = append(cdt.Middlewares, middlewares...) + }) +} + +// WithResourceResponses can be used to make the test client send simulated resource responses back over the sender stream. +func WithResourceResponses(responses []*backend.CallResourceResponse) HandlerMiddlewareTestOption { + return HandlerMiddlewareTestOption(func(cdt *HandlerMiddlewareTest) { + cdt.callResourceResponses = responses + }) +} + +// WithRunStreamBytesResponses can be used to make the test client send simulated bytes responses back over the sender stream. +func WithRunStreamBytesResponses(responses [][]byte) HandlerMiddlewareTestOption { + return HandlerMiddlewareTestOption(func(cdt *HandlerMiddlewareTest) { + cdt.runStreamResponseBytes = responses + }) +} + +// WithRunStreamJSONResponses can be used to make the test client send simulated JSON responses back over the sender stream. +func WithRunStreamJSONResponses(responses [][]byte) HandlerMiddlewareTestOption { + return HandlerMiddlewareTestOption(func(cdt *HandlerMiddlewareTest) { + cdt.runStreamResponseJSONBytes = responses + }) +} diff --git a/backend/stream.go b/backend/stream.go index 87d73cabd..c6eade541 100644 --- a/backend/stream.go +++ b/backend/stream.go @@ -5,9 +5,8 @@ import ( "encoding/json" "fmt" - "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" - "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana-plugin-sdk-go/genproto/pluginv2" ) const ( @@ -21,27 +20,74 @@ const ( EndpointRunStream Endpoint = "runStream" ) -// StreamHandler handles streams. -// This is EXPERIMENTAL and is a subject to change till Grafana 8. -type StreamHandler interface { +// SubscribeStreamHandler handles stream subscription. +type SubscribeStreamHandler interface { // SubscribeStream called when a user tries to subscribe to a plugin/datasource // managed channel path – thus plugin can check subscribe permissions and communicate // options with Grafana Core. As soon as first subscriber joins channel RunStream // will be called. - SubscribeStream(context.Context, *SubscribeStreamRequest) (*SubscribeStreamResponse, error) + SubscribeStream(ctx context.Context, req *SubscribeStreamRequest) (*SubscribeStreamResponse, error) +} + +// SubscribeStreamHandlerFunc is an adapter to allow the use of +// ordinary functions as backend.SubscribeStreamHandler. If f is a function +// with the appropriate signature, SubscribeStreamHandlerFunc(f) is a +// Handler that calls f. +type SubscribeStreamHandlerFunc func(ctx context.Context, req *SubscribeStreamRequest) (*SubscribeStreamResponse, error) + +// SubscribeStream calls fn(ctx, req, sender). +func (fn SubscribeStreamHandlerFunc) SubscribeStream(ctx context.Context, req *SubscribeStreamRequest) (*SubscribeStreamResponse, error) { + return fn(ctx, req) +} + +// PublishStreamHandler handles stream publication. +type PublishStreamHandler interface { // PublishStream called when a user tries to publish to a plugin/datasource // managed channel path. Here plugin can check publish permissions and // modify publication data if required. - PublishStream(context.Context, *PublishStreamRequest) (*PublishStreamResponse, error) + PublishStream(ctx context.Context, req *PublishStreamRequest) (*PublishStreamResponse, error) +} + +// PublishStreamHandlerFunc is an adapter to allow the use of +// ordinary functions as backend.PublishStreamHandler. If f is a function +// with the appropriate signature, SubscribeStreamHandlerFunc(f) is a +// Handler that calls f. +type PublishStreamHandlerFunc func(ctx context.Context, req *PublishStreamRequest) (*PublishStreamResponse, error) + +// SubscribeStream calls fn(ctx, req, sender). +func (fn PublishStreamHandlerFunc) PublishStream(ctx context.Context, req *PublishStreamRequest) (*PublishStreamResponse, error) { + return fn(ctx, req) +} + +// RunStreamHandler handles running of streams. +type RunStreamHandler interface { // RunStream will be initiated by Grafana to consume a stream. RunStream will be // called once for the first client successfully subscribed to a channel path. // When Grafana detects that there are no longer any subscribers inside a channel, // the call will be terminated until next active subscriber appears. Call termination // can happen with a delay. - RunStream(context.Context, *RunStreamRequest, *StreamSender) error + RunStream(ctx context.Context, req *RunStreamRequest, sender *StreamSender) error +} + +// RunStreamHandlerFunc is an adapter to allow the use of +// ordinary functions as backend.RunStreamHandler. If f is a function +// with the appropriate signature, RunStreamHandlerFunc(f) is a +// Handler that calls f. +type RunStreamHandlerFunc func(ctx context.Context, req *RunStreamRequest, sender *StreamSender) error + +// RunStream calls fn(ctx, req, sender). +func (fn RunStreamHandlerFunc) RunStream(ctx context.Context, req *RunStreamRequest, sender *StreamSender) error { + return fn(ctx, req, sender) +} + +// StreamHandler handles streams. +type StreamHandler interface { + SubscribeStreamHandler + PublishStreamHandler + RunStreamHandler } -// SubscribeStreamRequest is EXPERIMENTAL and is a subject to change till Grafana 8. +// SubscribeStreamRequest represents a request for a subscribe stream call. type SubscribeStreamRequest struct { PluginContext PluginContext Path string @@ -60,7 +106,7 @@ const ( SubscribeStreamStatusPermissionDenied SubscribeStreamStatus = 2 ) -// SubscribeStreamResponse is EXPERIMENTAL and is a subject to change till Grafana 8. +// SubscribeStreamResponse represents a response for a subscribe stream call. type SubscribeStreamResponse struct { Status SubscribeStreamStatus InitialData *InitialData @@ -97,7 +143,7 @@ func NewInitialData(data json.RawMessage) (*InitialData, error) { }, nil } -// PublishStreamRequest is EXPERIMENTAL and is a subject to change till Grafana 8. +// PublishStreamRequest represents a request for a publish stream call. type PublishStreamRequest struct { PluginContext PluginContext Path string @@ -116,35 +162,35 @@ const ( PublishStreamStatusPermissionDenied PublishStreamStatus = 2 ) -// PublishStreamResponse is EXPERIMENTAL and is a subject to change till Grafana 8. +// PublishStreamResponse represents a response for a publish stream call. type PublishStreamResponse struct { Status PublishStreamStatus Data json.RawMessage } -// RunStreamRequest is EXPERIMENTAL and is a subject to change till Grafana 8. +// RunStreamRequest represents a request for a run stream call. type RunStreamRequest struct { PluginContext PluginContext Path string Data json.RawMessage } -// StreamPacket is EXPERIMENTAL and is a subject to change till Grafana 8. +// StreamPacket represent a stream packet. type StreamPacket struct { Data json.RawMessage } -// StreamPacketSender is EXPERIMENTAL and is a subject to change till Grafana 8. +// StreamPacketSender is used for sending StreamPacket responses. type StreamPacketSender interface { Send(*StreamPacket) error } // StreamSender allows sending data to a stream. -// StreamSender is EXPERIMENTAL and is a subject to change till Grafana 8. type StreamSender struct { packetSender StreamPacketSender } +// NewStreamSender createa a new StreamSender. func NewStreamSender(packetSender StreamPacketSender) *StreamSender { return &StreamSender{packetSender: packetSender} }