-
Notifications
You must be signed in to change notification settings - Fork 720
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Improve TSO proxy based on the existing TSO Follower Batching framework #6565
base: master
Are you sure you want to change the base?
Conversation
[REVIEW NOTIFICATION] This pull request has been approved by:
To complete the pull request process, please ask the reviewers in the list to review by filling The full list of commands accepted by this bot can be found here. Reviewer can indicate their review by submitting an approval review. |
Codecov ReportPatch coverage:
Additional details and impacted files@@ Coverage Diff @@
## master #6565 +/- ##
==========================================
- Coverage 74.80% 74.70% -0.10%
==========================================
Files 417 416 -1
Lines 42585 42629 +44
==========================================
- Hits 31855 31848 -7
- Misses 7936 7986 +50
- Partials 2794 2795 +1
Flags with carried forward coverage won't be shown. Click here to find out more.
☔ View full report in Codecov by Sentry. |
551830a
to
3625afb
Compare
@@ -114,9 +114,6 @@ type Server struct { | |||
keyspaceGroupManager *tso.KeyspaceGroupManager | |||
// Store as map[string]*grpc.ClientConn | |||
clientConns sync.Map | |||
// tsoDispatcher is used to dispatch the TSO requests to |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why will tso server remove it?
pkg/utils/tsoutil/tso_dispatcher.go
Outdated
cctx, cancel := context.WithCancel(ctx) | ||
defer cancel() | ||
done := make(chan struct{}) | ||
dl := deadline{ | ||
timer: time.After(DefaultTSOProxyTimeout), | ||
done: done, | ||
cancel: cancel, | ||
} | ||
select { | ||
case tsDeadlineCh <- dl: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
how about using context.WithTimeout
log.Info("exiting from the dispatch loop. cleaning up the pending requests", | ||
zap.String("forwarded-host", forwardedHost)) | ||
if forwardStream != nil { | ||
forwardStream.closeSend() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does it mean that a new stream is created for each request?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It won't, because we only start one dispatch loop (this function) for all requests with the same forwarded host until there is any forwarding related error which causes the loop to exit, and we do this check and forwardStream.closeSend() in defer func() when exiting from this loop.
@@ -398,14 +367,72 @@ func (s *GrpcServer) Tso(stream pdpb.PD_TsoServer) error { | |||
} | |||
} | |||
|
|||
func (s *GrpcServer) getForwardedHost(ctx, streamCtx context.Context) (forwardedHost string, err error) { | |||
// forwardTSO forwards the incoming TSO requests to the TSO microservice. | |||
func (s *GrpcServer) forwardTSO(stream pdpb.PD_TsoServer) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we need to update getGlobalTSOFromTSOServer
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Before the change in this pr is proved to work as expected in dev/staging, I plan to keep the other RPCs unchanged and unimpacted.
1a40dbe
to
ff66f31
Compare
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
Signed-off-by: Bin Shi <binshi.bing@gmail.com>
7aaed93
to
8c070de
Compare
… gPRC stream (#6572) close #6549, ref #6565 Simplify tso proxy implementation by using one forward stream for one grpc.ServerStream. #6565 is a longer term solution for both follower batching and tso microservice. It's well implemented, but just need more time to bake, and we need a short term workable solution for now. Signed-off-by: Bin Shi <binshi.bing@gmail.com>
PR needs rebase. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes/test-infra repository. |
… gPRC stream (tikv#6572) close tikv#6549, ref tikv#6565 Simplify tso proxy implementation by using one forward stream for one grpc.ServerStream. tikv#6565 is a longer term solution for both follower batching and tso microservice. It's well implemented, but just need more time to bake, and we need a short term workable solution for now. Signed-off-by: Bin Shi <binshi.bing@gmail.com>
What problem does this PR solve?
Issue Number: Close #6549
What is changed and how does it work?
Check List
Tests
Release note