Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix workflow deletion #5793

Merged
merged 2 commits into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 35 additions & 62 deletions common/persistence/sql/sql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -625,104 +625,77 @@
ctx context.Context,
request *p.DeleteWorkflowExecutionRequest,
) error {
recoverPanic := func(recovered interface{}, err *error) {
if recovered != nil {
*err = fmt.Errorf("DB operation panicked: %v %s", recovered, debug.Stack())
}
}
dbShardID := sqlplugin.GetDBShardIDFromHistoryShardID(m.shardID, m.db.GetTotalNumDBShards())
domainID := serialization.MustParseUUID(request.DomainID)
runID := serialization.MustParseUUID(request.RunID)
wfID := request.WorkflowID
g, ctx := errgroup.WithContext(ctx)

g.Go(func() (e error) {
defer func() { recoverPanic(recover(), &e) }()
_, e = m.db.DeleteFromExecutions(ctx, &sqlplugin.ExecutionsFilter{
return m.txExecute(ctx, dbShardID, "DeleteWorkflowExecution", func(tx sqlplugin.Tx) error {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are not only changing the order but also making this operation a transaction.

  • Other transactional operations in this file are using txExecuteShardLockedFn. Should this also grab shard lock?
  • This transaction will perform deletions on multiple tables an will have to acquire corresponding locks at DB level. Compared to previous implementation the overall latency might be high which is OK for deletion case but do you think this has a chance to impact performance of other queries on those tables?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • It doesn't have to grab a shard lock, we're trying to delete a workflow passing the retention period. Whether it is deleted by the owner of the shard isn't important.
  • For innodb with repeatable read isolation level, the delete acquires a next-key lock on every record it encounters. The deletion are either point deletion or range deletion that deleting all records within a range. So the impact performance of other queries on those tables should be low. (Only 1 workflow next to the deleted workflow could be impacted.) For innodb with read-committed isolation level and myrocksdb, there is no next-key lock or gap lock, only the records being deleted are locked. So there is no impact to other workflows.

if _, err := tx.DeleteFromExecutions(ctx, &sqlplugin.ExecutionsFilter{
ShardID: m.shardID,
DomainID: domainID,
WorkflowID: wfID,
RunID: runID,
})
return e
})

g.Go(func() (e error) {
defer func() { recoverPanic(recover(), &e) }()
_, e = m.db.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
}); err != nil {
return convertCommonErrors(tx, "DeleteWorkflowExecution", "", err)
}
if _, err := tx.DeleteFromActivityInfoMaps(ctx, &sqlplugin.ActivityInfoMapsFilter{
ShardID: int64(m.shardID),
DomainID: domainID,
WorkflowID: wfID,
RunID: runID,
})
return e
})

g.Go(func() (e error) {
defer func() { recoverPanic(recover(), &e) }()
_, e = m.db.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
}); err != nil {
return convertCommonErrors(tx, "DeleteFromActivityInfoMaps", "", err)

Check warning on line 647 in common/persistence/sql/sql_execution_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/sql/sql_execution_store.go#L647

Added line #L647 was not covered by tests
}
if _, err := tx.DeleteFromTimerInfoMaps(ctx, &sqlplugin.TimerInfoMapsFilter{
ShardID: int64(m.shardID),
DomainID: domainID,
WorkflowID: wfID,
RunID: runID,
})
return e
})

g.Go(func() (e error) {
defer func() { recoverPanic(recover(), &e) }()
_, e = m.db.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
}); err != nil {
return convertCommonErrors(tx, "DeleteFromTimerInfoMaps", "", err)

Check warning on line 655 in common/persistence/sql/sql_execution_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/sql/sql_execution_store.go#L655

Added line #L655 was not covered by tests
}
if _, err := tx.DeleteFromChildExecutionInfoMaps(ctx, &sqlplugin.ChildExecutionInfoMapsFilter{
ShardID: int64(m.shardID),
DomainID: domainID,
WorkflowID: wfID,
RunID: runID,
})
return e
})

g.Go(func() (e error) {
defer func() { recoverPanic(recover(), &e) }()
_, e = m.db.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
}); err != nil {
return convertCommonErrors(tx, "DeleteFromChildExecutionInfoMaps", "", err)

Check warning on line 663 in common/persistence/sql/sql_execution_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/sql/sql_execution_store.go#L663

Added line #L663 was not covered by tests
}
if _, err := tx.DeleteFromRequestCancelInfoMaps(ctx, &sqlplugin.RequestCancelInfoMapsFilter{
ShardID: int64(m.shardID),
DomainID: domainID,
WorkflowID: wfID,
RunID: runID,
})
return e
})

g.Go(func() (e error) {
defer func() { recoverPanic(recover(), &e) }()
_, e = m.db.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
}); err != nil {
return convertCommonErrors(tx, "DeleteFromRequestCancelInfoMaps", "", err)

Check warning on line 671 in common/persistence/sql/sql_execution_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/sql/sql_execution_store.go#L671

Added line #L671 was not covered by tests
}
if _, err := tx.DeleteFromSignalInfoMaps(ctx, &sqlplugin.SignalInfoMapsFilter{
ShardID: int64(m.shardID),
DomainID: domainID,
WorkflowID: wfID,
RunID: runID,
})
return e
})

g.Go(func() (e error) {
defer func() { recoverPanic(recover(), &e) }()
_, e = m.db.DeleteFromBufferedEvents(ctx, &sqlplugin.BufferedEventsFilter{
}); err != nil {
return convertCommonErrors(tx, "DeleteFromSignalInfoMaps", "", err)

Check warning on line 679 in common/persistence/sql/sql_execution_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/sql/sql_execution_store.go#L679

Added line #L679 was not covered by tests
}
if _, err := tx.DeleteFromBufferedEvents(ctx, &sqlplugin.BufferedEventsFilter{
ShardID: m.shardID,
DomainID: domainID,
WorkflowID: wfID,
RunID: runID,
})
return e
})

g.Go(func() (e error) {
defer func() { recoverPanic(recover(), &e) }()
_, e = m.db.DeleteFromSignalsRequestedSets(ctx, &sqlplugin.SignalsRequestedSetsFilter{
}); err != nil {
return convertCommonErrors(tx, "DeleteFromBufferedEvents", "", err)

Check warning on line 687 in common/persistence/sql/sql_execution_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/sql/sql_execution_store.go#L687

Added line #L687 was not covered by tests
}
if _, err := tx.DeleteFromSignalsRequestedSets(ctx, &sqlplugin.SignalsRequestedSetsFilter{
ShardID: int64(m.shardID),
DomainID: domainID,
WorkflowID: wfID,
RunID: runID,
})
return e
}); err != nil {
return convertCommonErrors(tx, "DeleteFromSignalsRequestedSets", "", err)

Check warning on line 695 in common/persistence/sql/sql_execution_store.go

View check run for this annotation

Codecov / codecov/patch

common/persistence/sql/sql_execution_store.go#L695

Added line #L695 was not covered by tests
}
return nil
})
return g.Wait()
}

// its possible for a new run of the same workflow to have started after the run we are deleting
Expand Down
48 changes: 36 additions & 12 deletions common/persistence/sql/sql_execution_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1800,7 +1800,7 @@ func TestDeleteWorkflowExecution(t *testing.T) {
testCases := []struct {
name string
req *persistence.DeleteWorkflowExecutionRequest
mockSetup func(*sqlplugin.MockDB)
mockSetup func(*sqlplugin.MockDB, *sqlplugin.MockTx)
wantErr bool
}{
{
Expand All @@ -1810,59 +1810,82 @@ func TestDeleteWorkflowExecution(t *testing.T) {
WorkflowID: "wid",
RunID: "bbdcea69-61d5-44c3-9d55-afe23505a542",
},
mockSetup: func(mockDB *sqlplugin.MockDB) {
mockDB.EXPECT().DeleteFromExecutions(gomock.Any(), &sqlplugin.ExecutionsFilter{
mockSetup: func(mockDB *sqlplugin.MockDB, mockTx *sqlplugin.MockTx) {
mockDB.EXPECT().GetTotalNumDBShards().Return(1)
mockDB.EXPECT().BeginTx(gomock.Any(), gomock.Any()).Return(mockTx, nil)
mockTx.EXPECT().DeleteFromExecutions(gomock.Any(), &sqlplugin.ExecutionsFilter{
ShardID: int(shardID),
DomainID: serialization.MustParseUUID("abdcea69-61d5-44c3-9d55-afe23505a542"),
WorkflowID: "wid",
RunID: serialization.MustParseUUID("bbdcea69-61d5-44c3-9d55-afe23505a542"),
}).Return(nil, nil)
mockDB.EXPECT().DeleteFromActivityInfoMaps(gomock.Any(), &sqlplugin.ActivityInfoMapsFilter{
mockTx.EXPECT().DeleteFromActivityInfoMaps(gomock.Any(), &sqlplugin.ActivityInfoMapsFilter{
ShardID: shardID,
DomainID: serialization.MustParseUUID("abdcea69-61d5-44c3-9d55-afe23505a542"),
WorkflowID: "wid",
RunID: serialization.MustParseUUID("bbdcea69-61d5-44c3-9d55-afe23505a542"),
}).Return(nil, nil)
mockDB.EXPECT().DeleteFromTimerInfoMaps(gomock.Any(), &sqlplugin.TimerInfoMapsFilter{
mockTx.EXPECT().DeleteFromTimerInfoMaps(gomock.Any(), &sqlplugin.TimerInfoMapsFilter{
ShardID: shardID,
DomainID: serialization.MustParseUUID("abdcea69-61d5-44c3-9d55-afe23505a542"),
WorkflowID: "wid",
RunID: serialization.MustParseUUID("bbdcea69-61d5-44c3-9d55-afe23505a542"),
}).Return(nil, nil)
mockDB.EXPECT().DeleteFromChildExecutionInfoMaps(gomock.Any(), &sqlplugin.ChildExecutionInfoMapsFilter{
mockTx.EXPECT().DeleteFromChildExecutionInfoMaps(gomock.Any(), &sqlplugin.ChildExecutionInfoMapsFilter{
ShardID: shardID,
DomainID: serialization.MustParseUUID("abdcea69-61d5-44c3-9d55-afe23505a542"),
WorkflowID: "wid",
RunID: serialization.MustParseUUID("bbdcea69-61d5-44c3-9d55-afe23505a542"),
}).Return(nil, nil)
mockDB.EXPECT().DeleteFromRequestCancelInfoMaps(gomock.Any(), &sqlplugin.RequestCancelInfoMapsFilter{
mockTx.EXPECT().DeleteFromRequestCancelInfoMaps(gomock.Any(), &sqlplugin.RequestCancelInfoMapsFilter{
ShardID: shardID,
DomainID: serialization.MustParseUUID("abdcea69-61d5-44c3-9d55-afe23505a542"),
WorkflowID: "wid",
RunID: serialization.MustParseUUID("bbdcea69-61d5-44c3-9d55-afe23505a542"),
}).Return(nil, nil)
mockDB.EXPECT().DeleteFromSignalInfoMaps(gomock.Any(), &sqlplugin.SignalInfoMapsFilter{
mockTx.EXPECT().DeleteFromSignalInfoMaps(gomock.Any(), &sqlplugin.SignalInfoMapsFilter{
ShardID: shardID,
DomainID: serialization.MustParseUUID("abdcea69-61d5-44c3-9d55-afe23505a542"),
WorkflowID: "wid",
RunID: serialization.MustParseUUID("bbdcea69-61d5-44c3-9d55-afe23505a542"),
}).Return(nil, nil)
mockDB.EXPECT().DeleteFromBufferedEvents(gomock.Any(), &sqlplugin.BufferedEventsFilter{
mockTx.EXPECT().DeleteFromBufferedEvents(gomock.Any(), &sqlplugin.BufferedEventsFilter{
ShardID: int(shardID),
DomainID: serialization.MustParseUUID("abdcea69-61d5-44c3-9d55-afe23505a542"),
WorkflowID: "wid",
RunID: serialization.MustParseUUID("bbdcea69-61d5-44c3-9d55-afe23505a542"),
}).Return(nil, nil)
mockDB.EXPECT().DeleteFromSignalsRequestedSets(gomock.Any(), &sqlplugin.SignalsRequestedSetsFilter{
mockTx.EXPECT().DeleteFromSignalsRequestedSets(gomock.Any(), &sqlplugin.SignalsRequestedSetsFilter{
ShardID: shardID,
DomainID: serialization.MustParseUUID("abdcea69-61d5-44c3-9d55-afe23505a542"),
WorkflowID: "wid",
RunID: serialization.MustParseUUID("bbdcea69-61d5-44c3-9d55-afe23505a542"),
}).Return(nil, nil)

mockTx.EXPECT().Commit().Return(nil)
},
wantErr: false,
},
{
name: "Error case - failed to delete from executions",
req: &persistence.DeleteWorkflowExecutionRequest{
DomainID: "abdcea69-61d5-44c3-9d55-afe23505a542",
WorkflowID: "wid",
RunID: "bbdcea69-61d5-44c3-9d55-afe23505a542",
},
mockSetup: func(mockDB *sqlplugin.MockDB, mockTx *sqlplugin.MockTx) {
mockDB.EXPECT().GetTotalNumDBShards().Return(1)
mockDB.EXPECT().BeginTx(gomock.Any(), gomock.Any()).Return(mockTx, nil)
mockTx.EXPECT().DeleteFromExecutions(gomock.Any(), &sqlplugin.ExecutionsFilter{
ShardID: int(shardID),
DomainID: serialization.MustParseUUID("abdcea69-61d5-44c3-9d55-afe23505a542"),
WorkflowID: "wid",
RunID: serialization.MustParseUUID("bbdcea69-61d5-44c3-9d55-afe23505a542"),
}).Return(nil, errors.New("some error"))
mockTx.EXPECT().IsNotFoundError(gomock.Any()).Return(true)
mockTx.EXPECT().Rollback().Return(nil)
},
wantErr: true,
},
}

for _, tc := range testCases {
Expand All @@ -1871,10 +1894,11 @@ func TestDeleteWorkflowExecution(t *testing.T) {
defer ctrl.Finish()

mockDB := sqlplugin.NewMockDB(ctrl)
mockTx := sqlplugin.NewMockTx(ctrl)
store, err := NewSQLExecutionStore(mockDB, nil, int(shardID), nil, nil)
require.NoError(t, err, "failed to create execution store")

tc.mockSetup(mockDB)
tc.mockSetup(mockDB, mockTx)

err = store.DeleteWorkflowExecution(context.Background(), tc.req)
if tc.wantErr {
Expand Down
23 changes: 13 additions & 10 deletions service/history/task/timer_task_executor_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,21 +132,23 @@
msBuilder execution.MutableState,
) error {

if err := t.deleteCurrentWorkflowExecution(ctx, task); err != nil {
if err := t.deleteWorkflowHistory(ctx, task, msBuilder); err != nil {
return err
}

if err := t.deleteWorkflowExecution(ctx, task); err != nil {
if err := t.deleteWorkflowVisibility(ctx, task); err != nil {
return err
}

if err := t.deleteWorkflowHistory(ctx, task, msBuilder); err != nil {
if err := t.deleteCurrentWorkflowExecution(ctx, task); err != nil {
return err
}

if err := t.deleteWorkflowVisibility(ctx, task); err != nil {
// it must be the last one due to the nature of workflow execution deletion
if err := t.deleteWorkflowExecution(ctx, task); err != nil {
return err
}

// calling clear here to force accesses of mutable state to read database
// if this is not called then callers will get mutable state even though its been removed from database
context.Clear()
Expand Down Expand Up @@ -197,12 +199,6 @@
return err
}

if err := t.deleteCurrentWorkflowExecution(ctx, task); err != nil {
return err
}
if err := t.deleteWorkflowExecution(ctx, task); err != nil {
return err
}
Shaddoll marked this conversation as resolved.
Show resolved Hide resolved
// delete workflow history if history archival is not needed or history as been archived inline
if resp.HistoryArchivedInline {
t.metricsClient.IncCounter(metrics.HistoryProcessDeleteHistoryEventScope, metrics.WorkflowCleanupDeleteHistoryInlineCount)
Expand All @@ -215,6 +211,13 @@
if err := t.deleteWorkflowVisibility(ctx, task); err != nil {
return err
}

if err := t.deleteCurrentWorkflowExecution(ctx, task); err != nil {
return err

Check warning on line 216 in service/history/task/timer_task_executor_base.go

View check run for this annotation

Codecov / codecov/patch

service/history/task/timer_task_executor_base.go#L216

Added line #L216 was not covered by tests
}
if err := t.deleteWorkflowExecution(ctx, task); err != nil {
return err

Check warning on line 219 in service/history/task/timer_task_executor_base.go

View check run for this annotation

Codecov / codecov/patch

service/history/task/timer_task_executor_base.go#L219

Added line #L219 was not covered by tests
}
// calling clear here to force accesses of mutable state to read database
// if this is not called then callers will get mutable state even though its been removed from database
workflowContext.Clear()
Expand Down
Loading