Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Orchestration ID reuse policies #51

Merged
merged 4 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions api/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,29 @@ var (
EmptyInstanceID = InstanceID("")
)

type CreateOrchestrationAction = protos.CreateOrchestrationAction

const (
ERROR CreateOrchestrationAction = 0
IGNORE CreateOrchestrationAction = 1
TERMINATE CreateOrchestrationAction = 2
)

type OrchestrationStatus = protos.OrchestrationStatus

const (
RUNNING OrchestrationStatus = 0
COMPLETED OrchestrationStatus = 1
CONTINUED_AS_NEW OrchestrationStatus = 2
FAILED OrchestrationStatus = 3
CANCELED OrchestrationStatus = 4
TERMINATED OrchestrationStatus = 5
PENDING OrchestrationStatus = 6
SUSPENDED OrchestrationStatus = 7
)

type OrchestrationIdReusePolicy = protos.OrchestrationIdReusePolicy

// InstanceID is a unique identifier for an orchestration instance.
type InstanceID string

Expand Down
8 changes: 8 additions & 0 deletions backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,3 +122,11 @@ func UnmarshalHistoryEvent(bytes []byte) (*HistoryEvent, error) {
}
return e, nil
}

func BuildStatusSet(statuses []api.OrchestrationStatus) map[api.OrchestrationStatus]struct{} {
statusSet := make(map[api.OrchestrationStatus]struct{}, len(statuses))
for _, status := range statuses {
statusSet[status] = struct{}{}
}
return statusSet
}
2 changes: 1 addition & 1 deletion backend/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func (c *backendClient) ScheduleNewOrchestration(ctx context.Context, orchestrat

tc := helpers.TraceContextFromSpan(span)
e := helpers.NewExecutionStartedEvent(req.Name, req.InstanceId, req.Input, nil, tc)
if err := c.be.CreateOrchestrationInstance(ctx, e); err != nil {
if err := c.be.CreateOrchestrationInstance(ctx, e, WithOrchestrationIdReusePolicy(req.OrchestrationIdReusePolicy)); err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return api.EmptyInstanceID, fmt.Errorf("failed to start orchestration: %w", err)
Expand Down
23 changes: 8 additions & 15 deletions backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,15 +434,7 @@ func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *bac
return nil
}

func buildStatusSet(statuses []protos.OrchestrationStatus) map[protos.OrchestrationStatus]struct{} {
statusSet := make(map[protos.OrchestrationStatus]struct{})
for _, status := range statuses {
statusSet[status] = struct{}{}
}
return statusSet
}

func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context, e *backend.HistoryEvent, tx *sql.Tx, opts ...backend.OrchestrationIdReusePolicyOptions) (string, error) {
func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context, e *backend.HistoryEvent, tx *sql.Tx, opts ...backend.OrchestrationIdReusePolicyOptions) (string, error) {
if e == nil {
return "", errors.New("HistoryEvent must be non-nil")
} else if e.Timestamp == nil {
Expand All @@ -466,6 +458,7 @@ func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context
return "", err
}

// instance with same ID already exists
if rows <= 0 {
return instanceID, be.handleInstanceExists(ctx, tx, startEvent, policy, e)
}
Expand Down Expand Up @@ -500,7 +493,7 @@ func insertOrIgnoreInstanceTableInternal(ctx context.Context, tx *sql.Tx, e *bac
if err != nil {
return -1, fmt.Errorf("failed to count the rows affected: %w", err)
}
return rows, nil;
return rows, nil
}

func (be *sqliteBackend) handleInstanceExists(ctx context.Context, tx *sql.Tx, startEvent *protos.ExecutionStartedEvent, policy *protos.OrchestrationIdReusePolicy, e *backend.HistoryEvent) error {
Expand All @@ -519,7 +512,7 @@ func (be *sqliteBackend) handleInstanceExists(ctx context.Context, tx *sql.Tx, s
}

// instance already exists
targetStatusValues := buildStatusSet(policy.OperationStatus)
targetStatusValues := backend.BuildStatusSet(policy.OperationStatus)
// status not match, return instance duplicate error
if _, ok := targetStatusValues[helpers.FromRuntimeStatusString(*runtimeStatus)]; !ok {
return api.ErrDuplicateInstance
Expand All @@ -533,7 +526,7 @@ func (be *sqliteBackend) handleInstanceExists(ctx context.Context, tx *sql.Tx, s
return api.ErrIgnoreInstance
case protos.CreateOrchestrationAction_TERMINATE:
// terminate existing instance
if err := be.cleanupOrchestrationStateInternal(ctx, tx, api.InstanceID(startEvent.OrchestrationInstance.InstanceId),false); err != nil {
if err := be.cleanupOrchestrationStateInternal(ctx, tx, api.InstanceID(startEvent.OrchestrationInstance.InstanceId), false); err != nil {
return err
}
// create a new instance
Expand All @@ -552,7 +545,7 @@ func (be *sqliteBackend) handleInstanceExists(ctx context.Context, tx *sql.Tx, s
return api.ErrDuplicateInstance
}

func (be *sqliteBackend) cleanupOrchestrationStateInternal(ctx context.Context, tx *sql.Tx, id api.InstanceID, onlyIfCompleted bool) error {
func (be *sqliteBackend) cleanupOrchestrationStateInternal(ctx context.Context, tx *sql.Tx, id api.InstanceID, requireCompleted bool) error {
row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id))
if err := row.Err(); err != nil {
return fmt.Errorf("failed to query for instance existence: %w", err)
Expand All @@ -565,13 +558,13 @@ func (be *sqliteBackend) cleanupOrchestrationStateInternal(ctx context.Context,
return fmt.Errorf("failed to scan instance existence: %w", err)
}

if onlyIfCompleted {
if requireCompleted {
// purge orchestration in ['COMPLETED', 'FAILED', 'TERMINATED']
dbResult, err := tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ? AND [RuntimeStatus] IN ('COMPLETED', 'FAILED', 'TERMINATED')", string(id))
if err != nil {
return fmt.Errorf("failed to delete from the Instances table: %w", err)
}

rowsAffected, err := dbResult.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected in Instances delete operation: %w", err)
Expand Down
138 changes: 138 additions & 0 deletions tests/orchestrations_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -826,6 +826,144 @@ func Test_RecreateCompletedOrchestration(t *testing.T) {
)
}

func Test_SingleActivity_ReuseInstanceIDIgnore(t *testing.T) {
// Registration
r := task.NewTaskRegistry()
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
var input string
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output)
return output, err
})
r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) {
var name string
if err := ctx.GetInput(&name); err != nil {
return nil, err
}
return fmt.Sprintf("Hello, %s!", name), nil
})

// Initialization
ctx := context.Background()
client, worker := initTaskHubWorker(ctx, r)
defer worker.Shutdown(ctx)

instanceID := api.InstanceID("IGNORE_IF_RUNNING_OR_COMPLETED")
reuseIdPolicy := &api.OrchestrationIdReusePolicy{
Action: api.IGNORE,
OperationStatus: []api.OrchestrationStatus{api.RUNNING, api.COMPLETED, api.PENDING},
}

// Run the orchestration
id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID))
require.NoError(t, err)
// wait orchestration to start
client.WaitForOrchestrationStart(ctx, id)
pivotTime := time.Now()
// schedule again, it should ignore creating the new orchestration
id, err = client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationIdReusePolicy(reuseIdPolicy))
require.NoError(t, err)
timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second)
defer cancelTimeout()
metadata, err := client.WaitForOrchestrationCompletion(timeoutCtx, id)
require.NoError(t, err)
assert.Equal(t, true, metadata.IsComplete())
// the first orchestration should complete as the second one is ignored
assert.Equal(t, `"Hello, 世界!"`, metadata.SerializedOutput)
// assert the orchestration created timestamp
assert.True(t, pivotTime.After(metadata.CreatedAt))
}

func Test_SingleActivity_ReuseInstanceIDTerminate(t *testing.T) {
// Registration
r := task.NewTaskRegistry()
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
var input string
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output)
return output, err
})
r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) {
var name string
if err := ctx.GetInput(&name); err != nil {
return nil, err
}
return fmt.Sprintf("Hello, %s!", name), nil
})

// Initialization
ctx := context.Background()
client, worker := initTaskHubWorker(ctx, r)
defer worker.Shutdown(ctx)

instanceID := api.InstanceID("TERMINATE_IF_RUNNING_OR_COMPLETED")
reuseIdPolicy := &api.OrchestrationIdReusePolicy{
Action: api.TERMINATE,
OperationStatus: []api.OrchestrationStatus{api.RUNNING, api.COMPLETED, api.PENDING},
}

// Run the orchestration
id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID))
require.NoError(t, err)
// wait orchestration to start
client.WaitForOrchestrationStart(ctx, id)
pivotTime := time.Now()
// schedule again, it should terminate the first orchestration and start a new one
id, err = client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id), api.WithOrchestrationIdReusePolicy(reuseIdPolicy))
require.NoError(t, err)
timeoutCtx, cancelTimeout := context.WithTimeout(ctx, 30*time.Second)
defer cancelTimeout()
metadata, err := client.WaitForOrchestrationCompletion(timeoutCtx, id)
require.NoError(t, err)
assert.Equal(t, true, metadata.IsComplete())
// the second orchestration should complete.
assert.Equal(t, `"Hello, World!"`, metadata.SerializedOutput)
// assert the orchestration created timestamp
assert.True(t, pivotTime.Before(metadata.CreatedAt))
}

func Test_SingleActivity_ReuseInstanceIDError(t *testing.T) {
// Registration
r := task.NewTaskRegistry()
r.AddOrchestratorN("SingleActivity", func(ctx *task.OrchestrationContext) (any, error) {
var input string
if err := ctx.GetInput(&input); err != nil {
return nil, err
}
var output string
err := ctx.CallActivity("SayHello", task.WithActivityInput(input)).Await(&output)
return output, err
})
r.AddActivityN("SayHello", func(ctx task.ActivityContext) (any, error) {
var name string
if err := ctx.GetInput(&name); err != nil {
return nil, err
}
return fmt.Sprintf("Hello, %s!", name), nil
})

// Initialization
ctx := context.Background()
client, worker := initTaskHubWorker(ctx, r)
defer worker.Shutdown(ctx)

instanceID := api.InstanceID("ERROR_IF_RUNNING_OR_COMPLETED")

// Run the orchestration
id, err := client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("世界"), api.WithInstanceID(instanceID))
require.NoError(t, err)
id, err = client.ScheduleNewOrchestration(ctx, "SingleActivity", api.WithInput("World"), api.WithInstanceID(id))
if assert.Error(t, err) {
assert.Contains(t, err.Error(), "orchestration instance already exists")
}
}

func initTaskHubWorker(ctx context.Context, r *task.TaskRegistry, opts ...backend.NewTaskWorkerOptions) (backend.TaskHubClient, backend.TaskHubWorker) {
// TODO: Switch to options pattern
logger := backend.DefaultLogger()
Expand Down