-
Notifications
You must be signed in to change notification settings - Fork 287
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
syncer(dm): fix the data race issue #5881
Changes from 10 commits
b0a1081
da24ce3
46c7326
bac0290
3df51cb
c51de33
7e4e37e
82bcc93
3ecbac5
2e56fa2
0ec251f
cd3c626
d3694b8
ccfff85
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
|
@@ -552,7 +552,13 @@ func (st *SubTask) markResultCanceled() bool { | |||||
func (st *SubTask) Result() *pb.ProcessResult { | ||||||
st.RLock() | ||||||
defer st.RUnlock() | ||||||
return st.result | ||||||
if st.result == nil { | ||||||
return nil | ||||||
} | ||||||
tempProcessResult, _ := st.result.Marshal() | ||||||
newProcessResult := &pb.ProcessResult{} | ||||||
_ = newProcessResult.Unmarshal(tempProcessResult) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ditto. Or:
Suggested change
Is it necessary to decode and encode it again? What about deep copying the object?
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, it is necessary to decode and encode it, because Result() originally returns a pointer without the lock, which may cause data race when multiple functions are executing concurrently and calling Result(). In this case, deep copying would still return a pointer without the lock, which does not solve the problem. Here decoding and encoding again return a new copy of the process result, so that when multiple functions are calling Result() concurrently and using the returned pointer to modify things, they would be modifying the new copy, which has no effect on the original pointer, thus avoiding the potential data race. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Modified using deep copy. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. _ = newProcessResult.Unmarshal(tempProcessResult) |
||||||
return newProcessResult | ||||||
} | ||||||
|
||||||
// Close stops the sub task. | ||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -572,3 +572,35 @@ func TestValidatorStatus(t *testing.T) { | |
require.True(t, terror.ErrValidatorNotFound.Equal(err)) | ||
// validator != nil: will be tested in IT | ||
} | ||
|
||
func TestSubtaskRace(t *testing.T) { | ||
// to test data race of Marshal() and markResultCanceled() | ||
tempErrors := []*pb.ProcessError{} | ||
tempDetail := []byte{} | ||
tempProcessResult := pb.ProcessResult{ | ||
IsCanceled: false, | ||
Errors: tempErrors, | ||
Detail: tempDetail, | ||
} | ||
cfg := &config.SubTaskConfig{ | ||
Name: "test-subtask-race", | ||
ValidatorCfg: config.ValidatorConfig{ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Will this issue do anything with the validator? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure. Sorry I am not quite familiar with the validator. |
||
Mode: config.ValidationFast, | ||
}, | ||
} | ||
st := NewSubTaskWithStage(cfg, pb.Stage_Paused, nil, "worker") | ||
st.result = &tempProcessResult | ||
tempQueryStatusResponse := pb.QueryStatusResponse{} | ||
tempQueryStatusResponse.SubTaskStatus = make([]*pb.SubTaskStatus, 1) | ||
tempSubTaskStatus := pb.SubTaskStatus{} | ||
tempSubTaskStatus.Result = st.Result() | ||
tempQueryStatusResponse.SubTaskStatus[0] = &tempSubTaskStatus | ||
st.result.IsCanceled = false | ||
go func() { | ||
for i := 0; i < 10; i++ { | ||
_, _ = tempQueryStatusResponse.Marshal() | ||
} | ||
}() | ||
st.markResultCanceled() | ||
// this test is to test data race, so don't need assert here | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It'd better check the error just for the sake of security.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got it. I have modified the code and added the error handling.