Skip to content

Commit

Permalink
Reject invocations if workflow does not exist
Browse files Browse the repository at this point in the history
  • Loading branch information
erwinvaneyk committed Oct 8, 2018
1 parent 15cc81c commit 684aa89
Show file tree
Hide file tree
Showing 6 changed files with 52 additions and 31 deletions.
9 changes: 4 additions & 5 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -198,10 +198,9 @@ func Run(ctx context.Context, opts *Options) error {
invocationAPI := api.NewInvocationAPI(es)
resolvers := map[string]fnenv.RuntimeResolver{}
runtimes := map[string]fnenv.Runtime{}

reflectiveRuntime := workflows.NewRuntime(invocationAPI, invocationStore, workflowStore)
if opts.InternalRuntime || opts.Fission != nil {
log.Infof("Using Task Runtime: Workflow")
reflectiveRuntime := workflows.NewRuntime(invocationAPI, invocationStore)
runtimes[workflows.Name] = reflectiveRuntime
} else {
log.Info("No function runtimes specified.")
Expand Down Expand Up @@ -297,7 +296,7 @@ func Run(ctx context.Context, opts *Options) error {
}

if opts.InvocationAPI {
serveInvocationAPI(grpcServer, es, invocationStore)
serveInvocationAPI(grpcServer, es, invocationStore, workflowStore)
}

if opts.AdminAPI || opts.WorkflowAPI || opts.InvocationAPI {
Expand Down Expand Up @@ -470,9 +469,9 @@ func serveWorkflowAPI(s *grpc.Server, es fes.Backend, resolvers map[string]fnenv
log.Infof("Serving workflow gRPC API at %s.", gRPCAddress)
}

func serveInvocationAPI(s *grpc.Server, es fes.Backend, store *store.Invocations) {
func serveInvocationAPI(s *grpc.Server, es fes.Backend, invocations *store.Invocations, workflows *store.Workflows) {
invocationAPI := api.NewInvocationAPI(es)
invocationServer := apiserver.NewInvocation(invocationAPI, store)
invocationServer := apiserver.NewInvocation(invocationAPI, invocations, workflows)
apiserver.RegisterWorkflowInvocationAPIServer(s, invocationServer)
log.Infof("Serving workflow invocation gRPC API at %s.", gRPCAddress)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ Package apiserver contains all request handlers for gRPC and HTTP servers.
It has these top-level request handlers:
Admin - Administrative functionality related to managing the workflow engine.
Invocation - Functionality related to managing store.
Invocation - Functionality related to managing invocations.
Workflow - functionality related to managing workflows.
The purpose of this package is purely to provide handlers to gRPC and HTTP servers. Therefore,
it should not contain any logic (validation, composition, etc.) related to the workflows,
store or any other targets that it provides. All this logic should be placed in the actual
invocations or any other targets that it provides. All this logic should be placed in the actual
packages that are responsible for the business logic, such as `api`.
*/
package apiserver
Expand Down
41 changes: 26 additions & 15 deletions pkg/apiserver/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,18 +13,20 @@ import (
"golang.org/x/net/context"
)

// Invocation is responsible for all functionality related to managing store.
// Invocation is responsible for all functionality related to managing invocations.
type Invocation struct {
api *api.Invocation
store *store.Invocations
fnenv *workflows.Runtime
api *api.Invocation
invocations *store.Invocations
workflows *store.Workflows
fnenv *workflows.Runtime
}

func NewInvocation(api *api.Invocation, store *store.Invocations) WorkflowInvocationAPIServer {
func NewInvocation(api *api.Invocation, invocations *store.Invocations, workflows2 *store.Workflows) WorkflowInvocationAPIServer {
return &Invocation{
api: api,
store: store,
fnenv: workflows.NewRuntime(api, store),
api: api,
invocations: invocations,
workflows: workflows2,
fnenv: workflows.NewRuntime(api, invocations, workflows2),
}
}

Expand All @@ -37,6 +39,15 @@ func (gi *Invocation) Validate(ctx context.Context, spec *types.WorkflowInvocati
}

func (gi *Invocation) Invoke(ctx context.Context, spec *types.WorkflowInvocationSpec) (*types.ObjectMetadata, error) {
// TODO go through same runtime as InvokeSync
// Check if the workflow required by the invocation exists
if gi.workflows != nil {
_, err := gi.workflows.GetWorkflow(spec.GetWorkflowId())
if err != nil {
return nil, err
}
}

eventID, err := gi.api.Invoke(spec, api.WithContext(ctx))
if err != nil {
return nil, toErrorStatus(err)
Expand All @@ -63,7 +74,7 @@ func (gi *Invocation) Cancel(ctx context.Context, objectMetadata *types.ObjectMe
}

func (gi *Invocation) Get(ctx context.Context, objectMetadata *types.ObjectMetadata) (*types.WorkflowInvocation, error) {
wi, err := gi.store.GetInvocation(objectMetadata.GetId())
wi, err := gi.invocations.GetInvocation(objectMetadata.GetId())
if err != nil {
return nil, toErrorStatus(err)
}
Expand All @@ -72,18 +83,18 @@ func (gi *Invocation) Get(ctx context.Context, objectMetadata *types.ObjectMetad

func (gi *Invocation) List(ctx context.Context, query *InvocationListQuery) (*WorkflowInvocationList, error) {
var invocations []string
as := gi.store.List()
as := gi.invocations.List()
for _, aggregate := range as {
if aggregate.Type != aggregates.TypeWorkflowInvocation {
logrus.Errorf("Invalid type in invocation store: %v", aggregate.Format())
logrus.Errorf("Invalid type in invocation invocations: %v", aggregate.Format())
continue
}

if len(query.Workflows) > 0 {
// TODO make more efficient (by moving list queries to store)
entity, err := gi.store.GetAggregate(aggregate)
// TODO make more efficient (by moving list queries to invocations)
entity, err := gi.invocations.GetAggregate(aggregate)
if err != nil {
logrus.Errorf("List: failed to fetch %v from store: %v", aggregate, err)
logrus.Errorf("List: failed to fetch %v from invocations: %v", aggregate, err)
continue
}
wfi := entity.(*aggregates.WorkflowInvocation)
Expand All @@ -98,7 +109,7 @@ func (gi *Invocation) List(ctx context.Context, query *InvocationListQuery) (*Wo
}

func (gi *Invocation) AddTask(ctx context.Context, req *AddTaskRequest) (*empty.Empty, error) {
invocation, err := gi.store.GetInvocation(req.GetInvocationID())
invocation, err := gi.invocations.GetInvocation(req.GetInvocationID())
if err != nil {
return nil, toErrorStatus(err)
}
Expand Down
20 changes: 15 additions & 5 deletions pkg/fnenv/workflows/workflows.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,17 @@ var terminationEvent = []string{
// Runtime provides an abstraction of the workflow engine itself to use as a Task runtime environment.
type Runtime struct {
api *api.Invocation
store *store.Invocations
invocations *store.Invocations
workflows *store.Workflows
timeout time.Duration
pollInterval time.Duration
}

func NewRuntime(api *api.Invocation, store *store.Invocations) *Runtime {
func NewRuntime(api *api.Invocation, invocations *store.Invocations, workflows *store.Workflows) *Runtime {
return &Runtime{
api: api,
store: store,
invocations: invocations,
workflows: workflows,
pollInterval: PollInterval,
timeout: Timeout,
}
Expand Down Expand Up @@ -83,6 +85,14 @@ func (rt *Runtime) InvokeWorkflow(spec *types.WorkflowInvocationSpec, opts ...fn
return nil, err
}

// Check if the workflow required by the invocation exists
if rt.workflows != nil {
_, err := rt.workflows.GetWorkflow(spec.GetWorkflowId())
if err != nil {
return nil, err
}
}

timeStart := time.Now()
fnenv.FnActive.WithLabelValues(Name).Inc()
defer fnenv.FnExecTime.WithLabelValues(Name).Observe(float64(time.Since(timeStart)))
Expand All @@ -98,7 +108,7 @@ func (rt *Runtime) InvokeWorkflow(spec *types.WorkflowInvocationSpec, opts ...fn

timedCtx, cancelFn := context.WithTimeout(cfg.Ctx, rt.timeout)
defer cancelFn()
if pub, ok := rt.store.CacheReader.(pubsub.Publisher); ok {
if pub, ok := rt.invocations.CacheReader.(pubsub.Publisher); ok {
sub := pub.Subscribe(pubsub.SubscriptionOptions{
Buffer: 1,
LabelMatcher: labels.And(
Expand Down Expand Up @@ -141,7 +151,7 @@ func (rt *Runtime) InvokeWorkflow(spec *types.WorkflowInvocationSpec, opts ...fn
// checkForResult checks if the invocation with the specified ID has completed yet.
// If so it will return the workflow invocation object, otherwise it will return nil.
func (rt *Runtime) checkForResult(wfiID string) *types.WorkflowInvocation {
wi, err := rt.store.GetInvocation(wfiID)
wi, err := rt.invocations.GetInvocation(wfiID)
if err != nil {
logrus.Debugf("Could not find workflow invocation in cache: %v", err)
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/fnenv/workflows/workflows_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestRuntime_InvokeWorkflow_SubTimeout(t *testing.T) {

func TestRuntime_InvokeWorkflow_PollTimeout(t *testing.T) {
runtime, _, _, _ := setup()
runtime.store = store.NewInvocationStore(testutil.NewCache()) // ensure that cache does not support pubsub
runtime.invocations = store.NewInvocationStore(testutil.NewCache()) // ensure that cache does not support pubsub
runtime.timeout = 10 * time.Millisecond
runtime.pollInterval = 10 * time.Millisecond

Expand Down Expand Up @@ -67,7 +67,7 @@ func TestRuntime_InvokeWorkflow_SubSuccess(t *testing.T) {
func TestRuntime_InvokeWorkflow_PollSuccess(t *testing.T) {
runtime, invocationAPI, _, c := setup()
pollCache := store.NewInvocationStore(testutil.NewCache()) // ensure that cache does not support pubsub
runtime.store = pollCache
runtime.invocations = pollCache

output := typedvalues.MustWrap("foo")
go func() {
Expand Down Expand Up @@ -157,7 +157,7 @@ func setup() (*Runtime, *api.Invocation, *mem.Backend, fes.CacheReaderWriter) {
backend := mem.NewBackend()
invocationAPI := api.NewInvocationAPI(backend)
c := cache.NewSubscribedCache(testutil.NewCache(), aggregates.NewInvocationEntity, backend.Subscribe())
runtime := NewRuntime(invocationAPI, store.NewInvocationStore(c))
runtime := NewRuntime(invocationAPI, store.NewInvocationStore(c), nil)
runtime.timeout = 5 * time.Second
return runtime, invocationAPI, backend, c
}
3 changes: 2 additions & 1 deletion test/e2e/buildtest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ helm_install_fission_workflows ${fissionWorkflowsHelmId} ${NS} "pullPolicy=Alway
fission-workflows config
emph "Waiting for Fission Workflows to be ready..."
sleep 5
retry fission-workflows status
retry fission workflows status
fission workflows workflows get # fission workflows status sometimes succeeds even though workflows is not ready
echo
emph "Fission Workflows deployed!"

Expand Down

0 comments on commit 684aa89

Please sign in to comment.