From 684aa89d7e0bf0aacb06189249de16433edc9eff Mon Sep 17 00:00:00 2001 From: erwinvaneyk Date: Mon, 8 Oct 2018 13:39:50 +0200 Subject: [PATCH] Reject invocations if workflow does not exist --- cmd/fission-workflows-bundle/bundle/bundle.go | 9 ++-- pkg/apiserver/apiserver.go | 4 +- pkg/apiserver/invocation.go | 41 ++++++++++++------- pkg/fnenv/workflows/workflows.go | 20 ++++++--- pkg/fnenv/workflows/workflows_test.go | 6 +-- test/e2e/buildtest.sh | 3 +- 6 files changed, 52 insertions(+), 31 deletions(-) diff --git a/cmd/fission-workflows-bundle/bundle/bundle.go b/cmd/fission-workflows-bundle/bundle/bundle.go index 5dc2403b..4d788f32 100644 --- a/cmd/fission-workflows-bundle/bundle/bundle.go +++ b/cmd/fission-workflows-bundle/bundle/bundle.go @@ -198,10 +198,9 @@ func Run(ctx context.Context, opts *Options) error { invocationAPI := api.NewInvocationAPI(es) resolvers := map[string]fnenv.RuntimeResolver{} runtimes := map[string]fnenv.Runtime{} - + reflectiveRuntime := workflows.NewRuntime(invocationAPI, invocationStore, workflowStore) if opts.InternalRuntime || opts.Fission != nil { log.Infof("Using Task Runtime: Workflow") - reflectiveRuntime := workflows.NewRuntime(invocationAPI, invocationStore) runtimes[workflows.Name] = reflectiveRuntime } else { log.Info("No function runtimes specified.") @@ -297,7 +296,7 @@ func Run(ctx context.Context, opts *Options) error { } if opts.InvocationAPI { - serveInvocationAPI(grpcServer, es, invocationStore) + serveInvocationAPI(grpcServer, es, invocationStore, workflowStore) } if opts.AdminAPI || opts.WorkflowAPI || opts.InvocationAPI { @@ -470,9 +469,9 @@ func serveWorkflowAPI(s *grpc.Server, es fes.Backend, resolvers map[string]fnenv log.Infof("Serving workflow gRPC API at %s.", gRPCAddress) } -func serveInvocationAPI(s *grpc.Server, es fes.Backend, store *store.Invocations) { +func serveInvocationAPI(s *grpc.Server, es fes.Backend, invocations *store.Invocations, workflows *store.Workflows) { invocationAPI := api.NewInvocationAPI(es) - invocationServer := apiserver.NewInvocation(invocationAPI, store) + invocationServer := apiserver.NewInvocation(invocationAPI, invocations, workflows) apiserver.RegisterWorkflowInvocationAPIServer(s, invocationServer) log.Infof("Serving workflow invocation gRPC API at %s.", gRPCAddress) } diff --git a/pkg/apiserver/apiserver.go b/pkg/apiserver/apiserver.go index e3fe1f98..d0af49b8 100644 --- a/pkg/apiserver/apiserver.go +++ b/pkg/apiserver/apiserver.go @@ -3,12 +3,12 @@ Package apiserver contains all request handlers for gRPC and HTTP servers. It has these top-level request handlers: Admin - Administrative functionality related to managing the workflow engine. - Invocation - Functionality related to managing store. + Invocation - Functionality related to managing invocations. Workflow - functionality related to managing workflows. The purpose of this package is purely to provide handlers to gRPC and HTTP servers. Therefore, it should not contain any logic (validation, composition, etc.) related to the workflows, -store or any other targets that it provides. All this logic should be placed in the actual +invocations or any other targets that it provides. All this logic should be placed in the actual packages that are responsible for the business logic, such as `api`. */ package apiserver diff --git a/pkg/apiserver/invocation.go b/pkg/apiserver/invocation.go index 4681c67c..9db0f973 100644 --- a/pkg/apiserver/invocation.go +++ b/pkg/apiserver/invocation.go @@ -13,18 +13,20 @@ import ( "golang.org/x/net/context" ) -// Invocation is responsible for all functionality related to managing store. +// Invocation is responsible for all functionality related to managing invocations. type Invocation struct { - api *api.Invocation - store *store.Invocations - fnenv *workflows.Runtime + api *api.Invocation + invocations *store.Invocations + workflows *store.Workflows + fnenv *workflows.Runtime } -func NewInvocation(api *api.Invocation, store *store.Invocations) WorkflowInvocationAPIServer { +func NewInvocation(api *api.Invocation, invocations *store.Invocations, workflows2 *store.Workflows) WorkflowInvocationAPIServer { return &Invocation{ - api: api, - store: store, - fnenv: workflows.NewRuntime(api, store), + api: api, + invocations: invocations, + workflows: workflows2, + fnenv: workflows.NewRuntime(api, invocations, workflows2), } } @@ -37,6 +39,15 @@ func (gi *Invocation) Validate(ctx context.Context, spec *types.WorkflowInvocati } func (gi *Invocation) Invoke(ctx context.Context, spec *types.WorkflowInvocationSpec) (*types.ObjectMetadata, error) { + // TODO go through same runtime as InvokeSync + // Check if the workflow required by the invocation exists + if gi.workflows != nil { + _, err := gi.workflows.GetWorkflow(spec.GetWorkflowId()) + if err != nil { + return nil, err + } + } + eventID, err := gi.api.Invoke(spec, api.WithContext(ctx)) if err != nil { return nil, toErrorStatus(err) @@ -63,7 +74,7 @@ func (gi *Invocation) Cancel(ctx context.Context, objectMetadata *types.ObjectMe } func (gi *Invocation) Get(ctx context.Context, objectMetadata *types.ObjectMetadata) (*types.WorkflowInvocation, error) { - wi, err := gi.store.GetInvocation(objectMetadata.GetId()) + wi, err := gi.invocations.GetInvocation(objectMetadata.GetId()) if err != nil { return nil, toErrorStatus(err) } @@ -72,18 +83,18 @@ func (gi *Invocation) Get(ctx context.Context, objectMetadata *types.ObjectMetad func (gi *Invocation) List(ctx context.Context, query *InvocationListQuery) (*WorkflowInvocationList, error) { var invocations []string - as := gi.store.List() + as := gi.invocations.List() for _, aggregate := range as { if aggregate.Type != aggregates.TypeWorkflowInvocation { - logrus.Errorf("Invalid type in invocation store: %v", aggregate.Format()) + logrus.Errorf("Invalid type in invocation invocations: %v", aggregate.Format()) continue } if len(query.Workflows) > 0 { - // TODO make more efficient (by moving list queries to store) - entity, err := gi.store.GetAggregate(aggregate) + // TODO make more efficient (by moving list queries to invocations) + entity, err := gi.invocations.GetAggregate(aggregate) if err != nil { - logrus.Errorf("List: failed to fetch %v from store: %v", aggregate, err) + logrus.Errorf("List: failed to fetch %v from invocations: %v", aggregate, err) continue } wfi := entity.(*aggregates.WorkflowInvocation) @@ -98,7 +109,7 @@ func (gi *Invocation) List(ctx context.Context, query *InvocationListQuery) (*Wo } func (gi *Invocation) AddTask(ctx context.Context, req *AddTaskRequest) (*empty.Empty, error) { - invocation, err := gi.store.GetInvocation(req.GetInvocationID()) + invocation, err := gi.invocations.GetInvocation(req.GetInvocationID()) if err != nil { return nil, toErrorStatus(err) } diff --git a/pkg/fnenv/workflows/workflows.go b/pkg/fnenv/workflows/workflows.go index e52f1e7e..8986fa01 100644 --- a/pkg/fnenv/workflows/workflows.go +++ b/pkg/fnenv/workflows/workflows.go @@ -45,15 +45,17 @@ var terminationEvent = []string{ // Runtime provides an abstraction of the workflow engine itself to use as a Task runtime environment. type Runtime struct { api *api.Invocation - store *store.Invocations + invocations *store.Invocations + workflows *store.Workflows timeout time.Duration pollInterval time.Duration } -func NewRuntime(api *api.Invocation, store *store.Invocations) *Runtime { +func NewRuntime(api *api.Invocation, invocations *store.Invocations, workflows *store.Workflows) *Runtime { return &Runtime{ api: api, - store: store, + invocations: invocations, + workflows: workflows, pollInterval: PollInterval, timeout: Timeout, } @@ -83,6 +85,14 @@ func (rt *Runtime) InvokeWorkflow(spec *types.WorkflowInvocationSpec, opts ...fn return nil, err } + // Check if the workflow required by the invocation exists + if rt.workflows != nil { + _, err := rt.workflows.GetWorkflow(spec.GetWorkflowId()) + if err != nil { + return nil, err + } + } + timeStart := time.Now() fnenv.FnActive.WithLabelValues(Name).Inc() defer fnenv.FnExecTime.WithLabelValues(Name).Observe(float64(time.Since(timeStart))) @@ -98,7 +108,7 @@ func (rt *Runtime) InvokeWorkflow(spec *types.WorkflowInvocationSpec, opts ...fn timedCtx, cancelFn := context.WithTimeout(cfg.Ctx, rt.timeout) defer cancelFn() - if pub, ok := rt.store.CacheReader.(pubsub.Publisher); ok { + if pub, ok := rt.invocations.CacheReader.(pubsub.Publisher); ok { sub := pub.Subscribe(pubsub.SubscriptionOptions{ Buffer: 1, LabelMatcher: labels.And( @@ -141,7 +151,7 @@ func (rt *Runtime) InvokeWorkflow(spec *types.WorkflowInvocationSpec, opts ...fn // checkForResult checks if the invocation with the specified ID has completed yet. // If so it will return the workflow invocation object, otherwise it will return nil. func (rt *Runtime) checkForResult(wfiID string) *types.WorkflowInvocation { - wi, err := rt.store.GetInvocation(wfiID) + wi, err := rt.invocations.GetInvocation(wfiID) if err != nil { logrus.Debugf("Could not find workflow invocation in cache: %v", err) } diff --git a/pkg/fnenv/workflows/workflows_test.go b/pkg/fnenv/workflows/workflows_test.go index f3f1e7af..3384ddb5 100644 --- a/pkg/fnenv/workflows/workflows_test.go +++ b/pkg/fnenv/workflows/workflows_test.go @@ -30,7 +30,7 @@ func TestRuntime_InvokeWorkflow_SubTimeout(t *testing.T) { func TestRuntime_InvokeWorkflow_PollTimeout(t *testing.T) { runtime, _, _, _ := setup() - runtime.store = store.NewInvocationStore(testutil.NewCache()) // ensure that cache does not support pubsub + runtime.invocations = store.NewInvocationStore(testutil.NewCache()) // ensure that cache does not support pubsub runtime.timeout = 10 * time.Millisecond runtime.pollInterval = 10 * time.Millisecond @@ -67,7 +67,7 @@ func TestRuntime_InvokeWorkflow_SubSuccess(t *testing.T) { func TestRuntime_InvokeWorkflow_PollSuccess(t *testing.T) { runtime, invocationAPI, _, c := setup() pollCache := store.NewInvocationStore(testutil.NewCache()) // ensure that cache does not support pubsub - runtime.store = pollCache + runtime.invocations = pollCache output := typedvalues.MustWrap("foo") go func() { @@ -157,7 +157,7 @@ func setup() (*Runtime, *api.Invocation, *mem.Backend, fes.CacheReaderWriter) { backend := mem.NewBackend() invocationAPI := api.NewInvocationAPI(backend) c := cache.NewSubscribedCache(testutil.NewCache(), aggregates.NewInvocationEntity, backend.Subscribe()) - runtime := NewRuntime(invocationAPI, store.NewInvocationStore(c)) + runtime := NewRuntime(invocationAPI, store.NewInvocationStore(c), nil) runtime.timeout = 5 * time.Second return runtime, invocationAPI, backend, c } diff --git a/test/e2e/buildtest.sh b/test/e2e/buildtest.sh index 89a42699..06068226 100755 --- a/test/e2e/buildtest.sh +++ b/test/e2e/buildtest.sh @@ -88,7 +88,8 @@ helm_install_fission_workflows ${fissionWorkflowsHelmId} ${NS} "pullPolicy=Alway fission-workflows config emph "Waiting for Fission Workflows to be ready..." sleep 5 -retry fission-workflows status +retry fission workflows status +fission workflows workflows get # fission workflows status sometimes succeeds even though workflows is not ready echo emph "Fission Workflows deployed!"