-
Notifications
You must be signed in to change notification settings - Fork 812
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Task manager - task cleanup on passive side using task completer #6514
Task manager - task cleanup on passive side using task completer #6514
Conversation
} | ||
|
||
if !errors.Is(err, errDomainIsActive) && !errors.Is(err, errTaskNotStarted) { | ||
tc.logger.Error("Error completing task on domain's standby cluster", tag.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: Maybe emit a metric here? This seems like a weird case that we might want to monitor
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
context timeouts would fall in this branch which would happen during deployments/restarts. Still fine to log in those cases
} | ||
|
||
tc.scope. | ||
Tagged(metrics.DomainTag(task.domainName)). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
giga-Nit: I think we might get these tags from the current scope since we're using the tasklist manager's scope. If not I think it would make sense to construct a new scope so that we have a consistent set of dimensions for all these metrics if we forget to include them.
if errors.As(err, new(*types.EntityNotExistsError)) { | ||
return nil | ||
} else if err != nil { | ||
return fmt.Errorf("unable to fetch workflow execution from the history service: %w", err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure I understand not-found as a special case?
Mind adding a log here with some info such as the workflow ID and so fourth, this would near certainly be a bug, but in cases were we see unstuck tasklists it's forseeable. We probably wnat to know about such events, for both types of errors
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The case here is that it could take a while for the task list manager in the standby side to actually attempt to process the task (although it's unlikely), and based on the retention period for the workflow, we could be trying to get information about a workflow that is not present in the db anymore. Does that make sense?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would put it as a warn at least, I'd be pretty surprised for the average retention (7 days) to exceed the duration of somehting in the tasks table
return fmt.Errorf("unable to fetch domain from cache: %w", err) | ||
} | ||
|
||
if _, err = domainEntry.IsActiveIn(c.clusterMetadata.GetCurrentClusterName()); err == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also check if returned bool value to determine active vs passive. Current implementation of IsActiveIn()
always returns an error when it's returning false value but that doesn't have to be the case in the future.
I checked usages of IsActiveIn()
on history side which uses the bool and ignores the err.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
868e8dc
to
749393d
Compare
What changed?
This change adds the TaskCompleter to the TaskListManager.
The TaskCompleter is used to clean up started tasks in the domain's standby cluster. For every dispatched task, from the TaskReader to the TaskMatcher, the TaskCompleter checks if the current cluster is the standby. If it's not (it's the active cluster), the task is dispatched to MustOffer in the TaskMatcher as it's normally done. If it's the standby cluster, it gets the WorkflowExecution using the DescribeWorkflowExecution history endpoint, and checks whether the task has been started in the active cluster or not. If it's not, it retries. If it is, it marks the task completed the same way it's currently done in the active side. This advances the ackLevel and calls the gc.
Why?
Cadence does not process activity/decision tasks on a domain's standby cluster. It only process query tasks. That's by design and works as intended. But it has a side effect that the tasks added to the tasks table rely on their TTL to be removed from it. This side effect contributes to:
These changes aims to actively complete (remove from the database) tasks that have already been started in the active cluster. It does it in a sequential way the same way that the active cluster does it, leveraging the advance of the ackLevel and also performing range deletions using the garbage collector, minimizing the database calls and possible performance issues (such as tombstones in cassandra).
How did you test it?
Created unit tests for it and tested on a multicluster setup locally.
Potential risks
The taskCompleter is inserted in the TaskListManager dispatcher, which is the way that all async tasks are matched to pollers. It intercepts the dispatches in order to determine if the cluster is the standby cluster or not. An issue here will also disturb the active cluster and most likely interrupt the processing of async tasks.
Release notes
Active completion of already started tasks on the domain's standby cluster.
Documentation Changes