Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enabling service-discovery driven shutdown of matching engine #6198

Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
58eb90c
WIP commit
davidporter-id-au Jul 30, 2024
a8240d0
fixing some logs
davidporter-id-au Jul 30, 2024
6e9c59b
Add recover
davidporter-id-au Jul 30, 2024
c974255
Merge branch 'master' into bugfix/enabling-shutdown-through-service-d…
davidporter-id-au Aug 5, 2024
6acc273
WIP, snapshotting
davidporter-id-au Aug 6, 2024
4ff1e16
Moving this to engine
davidporter-id-au Aug 6, 2024
9c048eb
debugging
davidporter-id-au Aug 6, 2024
165b111
wip
davidporter-id-au Aug 7, 2024
f3efac6
Merge branch 'master' into bugfix/enabling-shutdown-through-service-d…
davidporter-id-au Aug 7, 2024
b1b77c3
Fix and add a small amount of coverage
davidporter-id-au Aug 7, 2024
bc10553
removing
davidporter-id-au Aug 7, 2024
7d4e592
Update service/matching/handler/engine.go
davidporter-id-au Aug 8, 2024
3236cc8
Update service/matching/handler/engine.go
davidporter-id-au Aug 8, 2024
84098f3
fixing some tests
davidporter-id-au Aug 8, 2024
c21d56a
Fix config
davidporter-id-au Aug 8, 2024
f695c38
Update service/matching/handler/membership.go
davidporter-id-au Aug 8, 2024
4d870b1
fixing test
davidporter-id-au Aug 13, 2024
9adba98
lint
davidporter-id-au Aug 13, 2024
8e22562
Merge branch 'master' into bugfix/enabling-shutdown-through-service-d…
davidporter-id-au Aug 13, 2024
2eeaca5
feedback
davidporter-id-au Aug 13, 2024
40f716d
fixing up
davidporter-id-au Aug 14, 2024
2f74aec
fix copy of sync wg
davidporter-id-au Aug 14, 2024
c36df6a
coverage
davidporter-id-au Aug 14, 2024
15ec5ca
maybe fix data race
davidporter-id-au Aug 14, 2024
25e649b
race debugging
davidporter-id-au Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -1672,6 +1672,13 @@ const (
// Default value: false
// Allowed filters: DomainID
MatchingEnableTaskInfoLogByDomainID
// MatchingEnableTasklistGuardAgainstOwnershipShardLoss
// enables guards to prevent tasklists from processing if there is any detection that the host
// no longer is active or owns the shard
// KeyName: matching.enableTasklistGuardAgainstOwnershipLoss
// Value type: Bool
// Default value: false
MatchingEnableTasklistGuardAgainstOwnershipShardLoss

// key for history

Expand Down Expand Up @@ -4102,6 +4109,11 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "MatchingEnableTaskInfoLogByDomainID is enables info level logs for decision/activity task based on the request domainID",
DefaultValue: false,
},
MatchingEnableTasklistGuardAgainstOwnershipShardLoss: {
KeyName: "matching.enableTasklistGuardAgainstOwnershipLoss",
Description: "allows guards to ensure that tasklists don't continue processing if there's signal that they've lost ownership",
DefaultValue: true,
},
EventsCacheGlobalEnable: {
KeyName: "history.eventsCacheGlobalEnable",
Description: "EventsCacheGlobalEnable is enables global cache over all history shards",
Expand Down
10 changes: 5 additions & 5 deletions common/errors/taskListNotOwnedByHostError.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,21 +24,21 @@ package errors

import "fmt"

var _ error = &TaskListNotOwnnedByHostError{}
var _ error = &TaskListNotOwnedByHostError{}

type TaskListNotOwnnedByHostError struct {
type TaskListNotOwnedByHostError struct {
OwnedByIdentity string
MyIdentity string
TasklistName string
}

func (m *TaskListNotOwnnedByHostError) Error() string {
func (m *TaskListNotOwnedByHostError) Error() string {
return fmt.Sprintf("task list is not owned by this host: OwnedBy: %s, Me: %s, Tasklist: %s",
m.OwnedByIdentity, m.MyIdentity, m.TasklistName)
}

func NewTaskListNotOwnnedByHostError(ownedByIdentity string, myIdentity string, tasklistName string) *TaskListNotOwnnedByHostError {
return &TaskListNotOwnnedByHostError{
func NewTaskListNotOwnedByHostError(ownedByIdentity string, myIdentity string, tasklistName string) *TaskListNotOwnedByHostError {
return &TaskListNotOwnedByHostError{
OwnedByIdentity: ownedByIdentity,
MyIdentity: myIdentity,
TasklistName: tasklistName,
Expand Down
6 changes: 6 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,6 +956,12 @@ func VisibilityQuery(query string) Tag {
return newStringTag("visibility-query", query)
}

// MembershipChangeEvent is a predefined tag for when logging hashring change events,
// expected to be of type membership.ChangeEvent
func MembershipChangeEvent(event interface{}) Tag {
return newPredefinedDynamicTag("membership-change-event", event)
}

// Dynamic Uses reflection based logging for arbitrary values
// for not very performant logging
func Dynamic(key string, v interface{}) Tag {
Expand Down
3 changes: 3 additions & 0 deletions service/matching/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type (
TaskDispatchRPSTTL time.Duration
// task gc configuration
MaxTimeBetweenTaskDeletes time.Duration

EnableTasklistOwnershipGuard dynamicconfig.BoolPropertyFn
}

ForwarderConfig struct {
Expand Down Expand Up @@ -154,6 +156,7 @@ func NewConfig(dc *dynamicconfig.Collection, hostName string) *Config {
EnableTasklistIsolation: dc.GetBoolPropertyFilteredByDomain(dynamicconfig.EnableTasklistIsolation),
AllIsolationGroups: mapIGs(dc.GetListProperty(dynamicconfig.AllIsolationGroups)()),
AsyncTaskDispatchTimeout: dc.GetDurationPropertyFilteredByTaskListInfo(dynamicconfig.AsyncTaskDispatchTimeout),
EnableTasklistOwnershipGuard: dc.GetBoolProperty(dynamicconfig.MatchingEnableTasklistGuardAgainstOwnershipShardLoss),
HostName: hostName,
TaskDispatchRPS: 100000.0,
TaskDispatchRPSTTL: time.Minute,
Expand Down
160 changes: 139 additions & 21 deletions service/matching/handler/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type (
}

matchingEngineImpl struct {
shutdown chan struct{}
taskManager persistence.TaskManager
clusterMetadata cluster.Metadata
historyService history.Client
Expand Down Expand Up @@ -120,7 +121,8 @@ var (
var _ Engine = (*matchingEngineImpl)(nil) // Asserts that interface is indeed implemented

// NewEngine creates an instance of matching engine
func NewEngine(taskManager persistence.TaskManager,
func NewEngine(
taskManager persistence.TaskManager,
clusterMetadata cluster.Metadata,
historyService history.Client,
matchingClient matching.Client,
Expand All @@ -132,7 +134,9 @@ func NewEngine(taskManager persistence.TaskManager,
partitioner partition.Partitioner,
timeSource clock.TimeSource,
) Engine {

e := &matchingEngineImpl{
shutdown: make(chan struct{}),
taskManager: taskManager,
clusterMetadata: clusterMetadata,
historyService: historyService,
Expand All @@ -149,15 +153,20 @@ func NewEngine(taskManager persistence.TaskManager,
partitioner: partitioner,
timeSource: timeSource,
}

go e.subscribeToMembershipChanges()

e.waitForQueryResultFn = e.waitForQueryResult
return e
}

func (e *matchingEngineImpl) Start() {
// As task lists are initialized lazily nothing is done on startup at this point.
// reset the shutdown channel if there's any listeners
e.shutdown = make(chan struct{})
}

func (e *matchingEngineImpl) Stop() {
close(e.shutdown)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we're not waiting for subscribeToMembershipChanges to complete, possibly goroutine leak?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true: I was too lazy and didn't think to add a full WG setup to engine. What do you think? It does mean that I didn't toggle it on for the leak-detector

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for not leaving goroutines behind. let's wait here until subscribeToMembershipChanges returns (via waitgroup)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I don't love adding a bunch of complexity back to the shutdown process, but fair. Added a waitgroup.

// Executes Stop() on each task list outside of lock
for _, l := range e.getTaskLists(math.MaxInt32) {
l.Stop()
Expand Down Expand Up @@ -200,26 +209,9 @@ func (e *matchingEngineImpl) getTaskListManager(taskList *tasklist.Identifier, t
}
e.taskListsLock.RUnlock()

// Defensive check to make sure we actually own the task list
// If we try to create a task list manager for a task list that is not owned by us, return an error
// The new task list manager will steal the task list from the current owner, which should only happen if
// the task list is owned by the current host.
taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName())
if err != nil {
return nil, fmt.Errorf("failed to lookup task list owner: %w", err)
}

self, err := e.membershipResolver.WhoAmI()
err := e.guardAgainstShardLoss(taskList)
if err != nil {
return nil, fmt.Errorf("failed to lookup self im membership: %w", err)
}

if taskListOwner.Identity() != self.Identity() {
return nil, cadence_errors.NewTaskListNotOwnnedByHostError(
taskListOwner.Identity(),
self.Identity(),
taskList.GetName(),
)
return nil, err
}

// If it gets here, write lock and check again in case a task list is created between the two locks
Expand Down Expand Up @@ -1202,6 +1194,132 @@ func (e *matchingEngineImpl) emitInfoOrDebugLog(
}
}

func (e *matchingEngineImpl) shutDownNonOwnedTasklists() error {
if !e.config.EnableTasklistOwnershipGuard() {
return nil
}
noLongerOwned, err := e.getNonOwnedTasklistsLocked()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Preemptively stopping tasklists based on service-discovery result might cause "freeze" on a tasklist whose new owner didn't claim it yet. Ideally the old host performs operation on it until the new owner host claims the lease. Is that something we can easily check from DB? (I guess not)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the duration of that wait I would expect to be the startup time of a shard + the latency of service-discovery propogation (interally for us, 2 seconds). I do think that it's a slight risk, but thats' not super far off the duration of a LWT, so I'm not sure doing a CAS operation to poll or something would be worthwhile.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this change should perform better than what we have right now but the ideal implementation should also consider that new owner is active. We can try that as a follow up. Do you have plans to make the simulation framework support ring change scenarios?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't have plans to so personally, I need to move on to a different project, but I do think that's a good idea.

if err != nil {
return err
}

for _, tl := range noLongerOwned {
// for each of the tasklists that are no longer owned, kick off the
// process of stopping them. The stopping process is IO heavy and
// can take a while, so do them in parallel so as to not hold the
// lock too long.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

which lock is being held here? If this comment belongs to a previous iteration then let's just stop the tasklists synchronously. Better to not return from shutDownNonOwnedTasklists while some background goroutines are still stopping task lists. shutDownNonOwnedTasklists may be called again via ticker and might try to do the same/overlapping work and cause unexpected problems.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It does refer to an earlier iteration where it was locking, so let me fix.

My concern with doing it serially is that the net time to shut down will be quite slow, the tlmgr.Close() method does a lot of IO. What about throwing in a waitgroup and doing it pararallel?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doing parallel should be fine I think. just avoid leaving them behind before returning

go func(tl tasklist.Manager) {

defer func() {
if r := recover(); r != nil {
e.logger.Error("panic occurred while trying to shut down tasklist", tag.Dynamic("recovered-panic", r))
}
}()

e.removeTaskListManager(tl)

e.logger.Info("shutting down tasklist preemptively because they are no longer owned by this host",
tag.WorkflowTaskListName(tl.TaskListID().GetName()),
tag.WorkflowDomainID(tl.TaskListID().GetDomainID()),
tag.Dynamic("tasklist-debug-info", tl.String()),
)

tl.Stop()
}(tl)
}
return nil
}

func (e *matchingEngineImpl) getNonOwnedTasklistsLocked() ([]tasklist.Manager, error) {
if !e.config.EnableTasklistOwnershipGuard() {
return nil, nil
}

var toShutDown []tasklist.Manager

e.taskListsLock.RLock()
defer e.taskListsLock.RUnlock()

self, err := e.membershipResolver.WhoAmI()
if err != nil {
return nil, fmt.Errorf("failed to lookup self im membership: %w", err)
}

for tl, manager := range e.taskLists {
taskListOwner, err := e.membershipResolver.Lookup(service.Matching, tl.GetName())
if err != nil {
return nil, fmt.Errorf("failed to lookup task list owner: %w", err)
}

if taskListOwner.Identity() != self.Identity() {
toShutDown = append(toShutDown, manager)
}
}

e.logger.Info("Got list of non-owned-tasklists",
tag.Dynamic("tasklist-debug-info", toShutDown),
)
return toShutDown, nil
}

func (e *matchingEngineImpl) guardAgainstShardLoss(taskList *tasklist.Identifier) error {
if !e.config.EnableTasklistOwnershipGuard() {
return nil
}

self, err := e.membershipResolver.WhoAmI()
if err != nil {
return fmt.Errorf("failed to lookup self im membership: %w", err)
}

if e.isShuttingDown() {
e.logger.Warn("request to get tasklist is being rejected be cause engine is Host is shut down",
tag.WorkflowDomainID(taskList.GetDomainID()),
tag.WorkflowTaskListType(taskList.GetType()),
tag.WorkflowTaskListName(taskList.GetName()),
)

return cadence_errors.NewTaskListNotOwnedByHostError(
"not known",
self.Identity(),
taskList.GetName(),
)
}

// Defensive check to make sure we actually own the task list
// If we try to create a task list manager for a task list that is not owned by us, return an error
// The new task list manager will steal the task list from the current owner, which should only happen if
// the task list is owned by the current host.
taskListOwner, err := e.membershipResolver.Lookup(service.Matching, taskList.GetName())
if err != nil {
return fmt.Errorf("failed to lookup task list owner: %w", err)
}

if taskListOwner.Identity() != self.Identity() {
e.logger.Warn("Request to get tasklist is being rejected be cause engine does not own this shard",
tag.WorkflowDomainID(taskList.GetDomainID()),
tag.WorkflowTaskListType(taskList.GetType()),
tag.WorkflowTaskListName(taskList.GetName()),
)
return cadence_errors.NewTaskListNotOwnedByHostError(
taskListOwner.Identity(),
self.Identity(),
taskList.GetName(),
)
}

return nil
}

func (e *matchingEngineImpl) isShuttingDown() bool {
select {
case <-e.shutdown:
return true
default:
return false
}
}

func (m *lockableQueryTaskMap) put(key string, value chan *queryResult) {
m.Lock()
defer m.Unlock()
Expand Down
2 changes: 1 addition & 1 deletion service/matching/handler/engine_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1317,7 +1317,7 @@ func (s *matchingEngineSuite) TestGetTaskListManager_OwnerShip() {
name: "Not owned by current host",
lookUpResult: "A",
whoAmIResult: "B",
expectedError: new(cadence_errors.TaskListNotOwnnedByHostError),
expectedError: new(cadence_errors.TaskListNotOwnedByHostError),
},
{
name: "LookupError",
Expand Down
Loading
Loading