-
Notifications
You must be signed in to change notification settings - Fork 5.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
disttask: maintain managed nodes separately #49623
Conversation
Codecov Report
Additional details and impacted files@@ Coverage Diff @@
## master #49623 +/- ##
================================================
+ Coverage 70.9801% 71.4163% +0.4362%
================================================
Files 1368 1427 +59
Lines 398761 423088 +24327
================================================
+ Hits 283041 302154 +19113
- Misses 95945 102010 +6065
+ Partials 19775 18924 -851
Flags with carried forward coverage won't be shown. Click here to find out more.
|
return nil, err | ||
} | ||
logutil.Logger(s.logCtx).Debug("eligible instances", zap.Int("num", len(serverNodes))) | ||
if len(serverNodes) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Then import into can't scale out nodes during execution?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this part is same as before
if task can only run on some nodes(len > 0), we use it, else we can use all managed nodes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get it, instance is for local mode.
@@ -173,8 +172,7 @@ func TestBackfillingSchedulerGlobalSortMode(t *testing.T) { | |||
taskID, err := mgr.CreateTask(ctx, task.Key, proto.Backfill, 1, task.Meta) | |||
require.NoError(t, err) | |||
task.ID = taskID | |||
serverInfos, _, err := sch.GetEligibleInstances(context.Background(), task) | |||
require.NoError(t, err) | |||
serverInfos := []string{":4000"} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
serverInfos := []string{":4000"} | |
execIDs := []string{":4000"} |
// if returned instances is empty, it means all instances are eligible. | ||
// TODO: run import from server disk using framework makes this logic complicated, | ||
// the instance might not be managed by framework. | ||
GetEligibleInstances(ctx context.Context, task *proto.Task) ([]string, error) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can remove this interface?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
not now, server disk import requires this
@@ -120,9 +121,18 @@ func (sm *Manager) Start() { | |||
failpoint.Inject("disableSchedulerManager", func() { | |||
failpoint.Return() | |||
}) | |||
// init cached managed nodes | |||
sm.nodeMgr.refreshManagedNodes(sm.ctx, sm.taskMgr) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if refresh failed?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's called periodicly per second, here just init it to make test pass
return nil, err | ||
} | ||
logutil.Logger(s.logCtx).Debug("eligible instances", zap.Int("num", len(serverNodes))) | ||
if len(serverNodes) == 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I get it, instance is for local mode.
/cc @tangenta |
type NodeManager struct { | ||
// prevLiveNodes is used to record the live nodes in last checking. | ||
prevLiveNodes map[string]struct{} | ||
managedNodes atomic.Pointer[[]string] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
add some comments for managedNodes.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Co-authored-by: EasonBall <592838129@qq.com>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
17/28
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rest lgtm
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: okJiang, ywqzzy The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
What problem does this PR solve?
Issue Number: ref #49008
Problem Summary:
What changed and how does it work?
GetEligibleInstances
returns target node that task can run, if no node returned, we use all.Check List
Tests
Side effects
Documentation
Release note
Please refer to Release Notes Language Style Guide to write a quality release note.