Skip to content

Commit

Permalink
Merging main
Browse files Browse the repository at this point in the history
Signed-off-by: Shivam Kumar <shivamkm07@gmail.com>
  • Loading branch information
shivamkm07 committed Dec 13, 2023
2 parents 426c4a8 + 86338c4 commit b4a73b9
Show file tree
Hide file tree
Showing 13 changed files with 1,347 additions and 704 deletions.
21 changes: 20 additions & 1 deletion .github/workflows/pr-validation.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ on:
# Allows you to run this workflow manually from the Actions tab
workflow_dispatch:

env:
# Configure protoc and go grpc plugin
PROTOC_VERSION: "25.x"
PROTOC_GEN_GO: "v1.28"
PROTOC_GEN_GO_GRPC: "v1.2"

# A workflow run is made up of one or more jobs that can run sequentially or in parallel
jobs:
# This workflow contains a single job called "build"
Expand All @@ -36,6 +42,19 @@ jobs:

- name: Install dependencies
run: go get .


- name: Install Protoc
uses: arduino/setup-protoc@v2
with:
version: ${{ env.PROTOC_VERSION }}

- name: Installing protoc-gen-go
run: |
go install google.golang.org/protobuf/cmd/protoc-gen-go@$PROTOC_GEN_GO
go install google.golang.org/grpc/cmd/protoc-gen-go-grpc@$PROTOC_GEN_GO_GRPC
- name: Generate grpc code
run: protoc --go_out=. --go-grpc_out=. -I ./submodules/durabletask-protobuf/protos orchestrator_service.proto

- name: Run integration tests
run: go test ./tests/... -coverpkg ./api,./task,./client,./backend/...,./internal/helpers
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [Unreleased]

### Changed

- Support reusing orchestration id ([#46](https://github.com/microsoft/durabletask-go/pull/46)) - contributed by [@kaibocai](https://github.com/kaibocai)

## [v0.3.1] - 2023-09-08

### Fixed
Expand Down
22 changes: 18 additions & 4 deletions api/orchestration.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,12 @@ import (
)

var (
ErrInstanceNotFound = errors.New("no such instance exists")
ErrNotStarted = errors.New("orchestration has not started")
ErrNotCompleted = errors.New("orchestration has not yet completed")
ErrNoFailures = errors.New("orchestration did not report failure details")
ErrInstanceNotFound = errors.New("no such instance exists")
ErrNotStarted = errors.New("orchestration has not started")
ErrNotCompleted = errors.New("orchestration has not yet completed")
ErrNoFailures = errors.New("orchestration did not report failure details")
ErrDuplicateInstance = errors.New("orchestration instance already exists")
ErrIgnoreInstance = errors.New("ignore creating orchestration instance")

EmptyInstanceID = InstanceID("")
)
Expand Down Expand Up @@ -60,6 +62,18 @@ func WithInstanceID(id InstanceID) NewOrchestrationOptions {
}
}

// WithOrchestrationIdReusePolicy configures Orchestration ID reuse policy.
func WithOrchestrationIdReusePolicy(policy *protos.OrchestrationIdReusePolicy) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) error {
// initialize CreateInstanceOption
req.OrchestrationIdReusePolicy = &protos.OrchestrationIdReusePolicy{
Action: policy.Action,
OperationStatus: policy.OperationStatus,
}
return nil
}
}

// WithInput configures an input for the orchestration. The specified input must be serializable.
func WithInput(input any) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) error {
Expand Down
14 changes: 13 additions & 1 deletion backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,18 @@ type (
TaskFailureDetails = protos.TaskFailureDetails
)

type OrchestrationIdReusePolicyOptions func(*protos.OrchestrationIdReusePolicy) error

func WithOrchestrationIdReusePolicy(policy *protos.OrchestrationIdReusePolicy) OrchestrationIdReusePolicyOptions {
return func(po *protos.OrchestrationIdReusePolicy) error {
if policy != nil {
po.Action = policy.Action
po.OperationStatus = policy.OperationStatus
}
return nil
}
}

type Backend interface {
// CreateTaskHub creates a new task hub for the current backend. Task hub creation must be idempotent.
//
Expand All @@ -43,7 +55,7 @@ type Backend interface {

// CreateOrchestrationInstance creates a new orchestration instance with a history event that
// wraps a ExecutionStarted event.
CreateOrchestrationInstance(context.Context, *HistoryEvent) error
CreateOrchestrationInstance(context.Context, *HistoryEvent, ...OrchestrationIdReusePolicyOptions) error

// AddNewEvent adds a new orchestration event to the specified orchestration instance.
AddNewOrchestrationEvent(context.Context, api.InstanceID, *HistoryEvent) error
Expand Down
2 changes: 1 addition & 1 deletion backend/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func (g *grpcExecutor) StartInstance(ctx context.Context, req *protos.CreateInst
defer span.End()

e := helpers.NewExecutionStartedEvent(req.Name, instanceID, req.Input, nil, helpers.TraceContextFromSpan(span))
if err := g.backend.CreateOrchestrationInstance(ctx, e); err != nil {
if err := g.backend.CreateOrchestrationInstance(ctx, e, WithOrchestrationIdReusePolicy(req.OrchestrationIdReusePolicy)); err != nil {
return nil, err
}

Expand Down
184 changes: 142 additions & 42 deletions backend/sqlite/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,9 @@ func NewSqliteBackend(opts *SqliteOptions, logger backend.Logger) backend.Backen
be.dsn = opts.FilePath
}

// used for local debug
// be.dsn = "file:file.sqlite"

return be
}

Expand Down Expand Up @@ -333,8 +336,7 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *
for _, msg := range wi.State.PendingMessages() {
if es := msg.HistoryEvent.GetExecutionStarted(); es != nil {
// Need to insert a new row into the DB
var instanceID string
if err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx, &instanceID); err != nil {
if _, err := be.createOrchestrationInstanceInternal(ctx, msg.HistoryEvent, tx); err != nil {
if err == backend.ErrDuplicateEvent {
be.logger.Warnf(
"%v: dropping sub-orchestration creation event because an instance with the target ID (%v) already exists.",
Expand Down Expand Up @@ -390,7 +392,7 @@ func (be *sqliteBackend) CompleteOrchestrationWorkItem(ctx context.Context, wi *
}

// CreateOrchestrationInstance implements backend.Backend
func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent) error {
func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *backend.HistoryEvent, opts ...backend.OrchestrationIdReusePolicyOptions) error {
if err := be.ensureDB(); err != nil {
return err
}
Expand All @@ -402,7 +404,10 @@ func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *bac
defer tx.Rollback()

var instanceID string
if err := be.createOrchestrationInstanceInternal(ctx, e, tx, &instanceID); err != nil {
if instanceID, err = be.createOrchestrationInstanceInternal(ctx, e, tx, opts...); errors.Is(err, api.ErrIgnoreInstance) {
// choose to ignore, do nothing
return nil
} else if err != nil {
return err
}

Expand All @@ -429,19 +434,45 @@ func (be *sqliteBackend) CreateOrchestrationInstance(ctx context.Context, e *bac
return nil
}

func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context, e *backend.HistoryEvent, tx *sql.Tx, instanceID *string) error {
func buildStatusSet(statuses []protos.OrchestrationStatus) map[protos.OrchestrationStatus]struct{} {
statusSet := make(map[protos.OrchestrationStatus]struct{})
for _, status := range statuses {
statusSet[status] = struct{}{}
}
return statusSet
}

func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context, e *backend.HistoryEvent, tx *sql.Tx, opts ...backend.OrchestrationIdReusePolicyOptions) (string, error) {
if e == nil {
return errors.New("HistoryEvent must be non-nil")
return "", errors.New("HistoryEvent must be non-nil")
} else if e.Timestamp == nil {
return errors.New("HistoryEvent must have a non-nil timestamp")
return "", errors.New("HistoryEvent must have a non-nil timestamp")
}

startEvent := e.GetExecutionStarted()
if startEvent == nil {
return errors.New("HistoryEvent must be an ExecutionStartedEvent")
return "", errors.New("HistoryEvent must be an ExecutionStartedEvent")
}
instanceID := startEvent.OrchestrationInstance.InstanceId

policy := &protos.OrchestrationIdReusePolicy{}

for _, opt := range opts {
opt(policy)
}

// TODO: Support for re-using orchestration instance IDs
rows, err := insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent)
if err != nil {
return "", err
}

if rows <= 0 {
return instanceID, be.handleInstanceExists(ctx, tx, startEvent, policy, e)
}
return instanceID, nil
}

func insertOrIgnoreInstanceTableInternal(ctx context.Context, tx *sql.Tx, e *backend.HistoryEvent, startEvent *protos.ExecutionStartedEvent) (int64, error) {
res, err := tx.ExecContext(
ctx,
`INSERT OR IGNORE INTO [Instances] (
Expand All @@ -462,19 +493,114 @@ func (be *sqliteBackend) createOrchestrationInstanceInternal(ctx context.Context
e.Timestamp.AsTime(),
)
if err != nil {
return fmt.Errorf("failed to insert into [Instances] table: %w", err)
return -1, fmt.Errorf("failed to insert into [Instances] table: %w", err)
}

rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("failed to count the rows affected: %w", err)
return -1, fmt.Errorf("failed to count the rows affected: %w", err)
}
return rows, nil;
}

if rows <= 0 {
return backend.ErrDuplicateEvent
func (be *sqliteBackend) handleInstanceExists(ctx context.Context, tx *sql.Tx, startEvent *protos.ExecutionStartedEvent, policy *protos.OrchestrationIdReusePolicy, e *backend.HistoryEvent) error {
// query RuntimeStatus for the existing instance
queryRow := tx.QueryRowContext(
ctx,
`SELECT [RuntimeStatus] FROM Instances WHERE [InstanceID] = ?`,
startEvent.OrchestrationInstance.InstanceId,
)
var runtimeStatus *string
err := queryRow.Scan(&runtimeStatus)
if errors.Is(err, sql.ErrNoRows) {
return api.ErrInstanceNotFound
} else if err != nil {
return fmt.Errorf("failed to scan the Instances table result: %w", err)
}

// instance already exists
targetStatusValues := buildStatusSet(policy.OperationStatus)
// status not match, return instance duplicate error
if _, ok := targetStatusValues[helpers.FromRuntimeStatusString(*runtimeStatus)]; !ok {
return api.ErrDuplicateInstance
}

// status match
switch policy.Action {
case protos.CreateOrchestrationAction_IGNORE:
// Log an warning message and ignore creating new instance
be.logger.Warnf("An instance with ID '%s' already exists; dropping duplicate create request", startEvent.OrchestrationInstance.InstanceId)
return api.ErrIgnoreInstance
case protos.CreateOrchestrationAction_TERMINATE:
// terminate existing instance
if err := be.cleanupOrchestrationStateInternal(ctx, tx, api.InstanceID(startEvent.OrchestrationInstance.InstanceId),false); err != nil {
return err
}
// create a new instance
var rows int64
if rows, err = insertOrIgnoreInstanceTableInternal(ctx, tx, e, startEvent); err != nil {
return err
}

// should never happen, because we clean up instance before create new one
if rows <= 0 {
return fmt.Errorf("failed to insert into [Instances] table because entry already exists.")
}
return nil
}
// default behavior
return api.ErrDuplicateInstance
}

func (be *sqliteBackend) cleanupOrchestrationStateInternal(ctx context.Context, tx *sql.Tx, id api.InstanceID, onlyIfCompleted bool) error {
row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id))
if err := row.Err(); err != nil {
return fmt.Errorf("failed to query for instance existence: %w", err)
}

var unused int
if err := row.Scan(&unused); errors.Is(err, sql.ErrNoRows) {
return api.ErrInstanceNotFound
} else if err != nil {
return fmt.Errorf("failed to scan instance existence: %w", err)
}

*instanceID = startEvent.OrchestrationInstance.InstanceId
if onlyIfCompleted {
// purge orchestration in ['COMPLETED', 'FAILED', 'TERMINATED']
dbResult, err := tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ? AND [RuntimeStatus] IN ('COMPLETED', 'FAILED', 'TERMINATED')", string(id))
if err != nil {
return fmt.Errorf("failed to delete from the Instances table: %w", err)
}

rowsAffected, err := dbResult.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected in Instances delete operation: %w", err)
}
if rowsAffected == 0 {
return api.ErrNotCompleted
}
} else {
// clean up orchestration in all [RuntimeStatus]
_, err := tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from the Instances table: %w", err)
}
}

_, err := tx.ExecContext(ctx, "DELETE FROM History WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from History table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM NewEvents WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from NewEvents table: %w", err)
}

_, err = tx.ExecContext(ctx, "DELETE FROM NewTasks WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from NewTasks table: %w", err)
}
return nil
}

Expand Down Expand Up @@ -837,34 +963,8 @@ func (be *sqliteBackend) PurgeOrchestrationState(ctx context.Context, id api.Ins
}
defer tx.Rollback()

row := tx.QueryRowContext(ctx, "SELECT 1 FROM Instances WHERE [InstanceID] = ?", string(id))
if err := row.Err(); err != nil {
return fmt.Errorf("failed to query for instance existence: %w", err)
}

var unused int
if err := row.Scan(&unused); err == sql.ErrNoRows {
return api.ErrInstanceNotFound
} else if err != nil {
return fmt.Errorf("failed to scan instance existence: %w", err)
}

dbResult, err := tx.ExecContext(ctx, "DELETE FROM Instances WHERE [InstanceID] = ? AND [RuntimeStatus] IN ('COMPLETED', 'FAILED', 'TERMINATED')", string(id))
if err != nil {
return fmt.Errorf("failed to delete from the Instances table: %w", err)
}

rowsAffected, err := dbResult.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected in Instances delete operation: %w", err)
}
if rowsAffected == 0 {
return api.ErrNotCompleted
}

_, err = tx.ExecContext(ctx, "DELETE FROM History WHERE [InstanceID] = ?", string(id))
if err != nil {
return fmt.Errorf("failed to delete from History table: %w", err)
if err := be.cleanupOrchestrationStateInternal(ctx, tx, id, true); err != nil {
return err
}

if err = tx.Commit(); err != nil {
Expand Down
Loading

0 comments on commit b4a73b9

Please sign in to comment.