From a9b312b43d8e55ac55c68c28e9778271fd6a08ab Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 23 Jul 2024 17:45:32 +0800 Subject: [PATCH 1/3] Abstract tsostream Signed-off-by: MyonKeminta --- client/tso_client.go | 4 +- client/tso_dispatcher.go | 4 +- client/tso_stream.go | 262 ++++++++++++++++++++++++++++----------- 3 files changed, 196 insertions(+), 74 deletions(-) diff --git a/client/tso_client.go b/client/tso_client.go index 5e221eae478..4276f46ccc8 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -352,7 +352,7 @@ type tsoConnectionContext struct { // Current stream to send gRPC requests. // - `pdpb.PD_TsoClient` for a leader/follower in the PD cluster. // - `tsopb.TSO_TsoClient` for a primary/secondary in the TSO cluster. - stream tsoStream + stream *tsoStream } // updateConnectionCtxs will choose the proper way to update the connections for the given dc-location. @@ -382,7 +382,7 @@ func (c *tsoClient) tryConnectToTSO( var ( networkErrNum uint64 err error - stream tsoStream + stream *tsoStream url string cc *grpc.ClientConn updateAndClear = func(newURL string, connectionCtx *tsoConnectionContext) { diff --git a/client/tso_dispatcher.go b/client/tso_dispatcher.go index 0919fd84744..a7c99057275 100644 --- a/client/tso_dispatcher.go +++ b/client/tso_dispatcher.go @@ -186,7 +186,7 @@ func (td *tsoDispatcher) handleDispatcher(wg *sync.WaitGroup) { streamCtx context.Context cancel context.CancelFunc streamURL string - stream tsoStream + stream *tsoStream ) // Loop through each batch of TSO requests and send them for processing. streamLoopTimer := time.NewTimer(option.timeout) @@ -393,7 +393,7 @@ func chooseStream(connectionCtxs *sync.Map) (connectionCtx *tsoConnectionContext } func (td *tsoDispatcher) processRequests( - stream tsoStream, dcLocation string, tbc *tsoBatchController, + stream *tsoStream, dcLocation string, tbc *tsoBatchController, ) error { var ( requests = tbc.getCollectedRequests() diff --git a/client/tso_stream.go b/client/tso_stream.go index 9c4d78dfe18..4faf081c25d 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -47,7 +47,7 @@ func (*tsoTSOStreamBuilderFactory) makeBuilder(cc *grpc.ClientConn) tsoStreamBui // TSO Stream Builder type tsoStreamBuilder interface { - build(context.Context, context.CancelFunc, time.Duration) (tsoStream, error) + build(context.Context, context.CancelFunc, time.Duration) (*tsoStream, error) } type pdTSOStreamBuilder struct { @@ -55,14 +55,14 @@ type pdTSOStreamBuilder struct { client pdpb.PDClient } -func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (tsoStream, error) { +func (b *pdTSOStreamBuilder) build(ctx context.Context, cancel context.CancelFunc, timeout time.Duration) (*tsoStream, error) { done := make(chan struct{}) // TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created. go checkStreamTimeout(ctx, cancel, done, timeout) stream, err := b.client.Tso(ctx) done <- struct{}{} if err == nil { - return &pdTSOStream{stream: stream, serverURL: b.serverURL}, nil + return &tsoStream{stream: pdTSOStreamAdapter{stream}, serverURL: b.serverURL}, nil } return nil, err } @@ -74,14 +74,14 @@ type tsoTSOStreamBuilder struct { func (b *tsoTSOStreamBuilder) build( ctx context.Context, cancel context.CancelFunc, timeout time.Duration, -) (tsoStream, error) { +) (*tsoStream, error) { done := make(chan struct{}) // TODO: we need to handle a conner case that this goroutine is timeout while the stream is successfully created. go checkStreamTimeout(ctx, cancel, done, timeout) stream, err := b.client.Tso(ctx) done <- struct{}{} if err == nil { - return &tsoTSOStream{stream: stream, serverURL: b.serverURL}, nil + return &tsoStream{stream: tsoTSOStreamAdapter{stream}, serverURL: b.serverURL}, nil } return nil, err } @@ -99,30 +99,24 @@ func checkStreamTimeout(ctx context.Context, cancel context.CancelFunc, done cha <-done } -// TSO Stream - -type tsoStream interface { - getServerURL() string - // processRequests processes TSO requests in streaming mode to get timestamps - processRequests( - clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, - count int64, batchStartTime time.Time, - ) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) +type tsoRequestResult struct { + physical, logical int64 + count uint32 + suffixBits uint32 + respKeyspaceGroupID uint32 } -type pdTSOStream struct { - serverURL string - stream pdpb.PD_TsoClient +type grpcTSOStreamAdapter interface { + Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, + count int64) error + Recv() (tsoRequestResult, error) } -func (s *pdTSOStream) getServerURL() string { - return s.serverURL +type pdTSOStreamAdapter struct { + stream pdpb.PD_TsoClient } -func (s *pdTSOStream) processRequests( - clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time, -) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { - start := time.Now() +func (s pdTSOStreamAdapter) Send(clusterID uint64, _, _ uint32, dcLocation string, count int64) error { req := &pdpb.TsoRequest{ Header: &pdpb.RequestHeader{ ClusterId: clusterID, @@ -130,55 +124,28 @@ func (s *pdTSOStream) processRequests( Count: uint32(count), DcLocation: dcLocation, } + return s.stream.Send(req) +} - if err = s.stream.Send(req); err != nil { - if err == io.EOF { - err = errs.ErrClientTSOStreamClosed - } else { - err = errors.WithStack(err) - } - return - } - tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds()) +func (s pdTSOStreamAdapter) Recv() (tsoRequestResult, error) { resp, err := s.stream.Recv() - duration := time.Since(start).Seconds() if err != nil { - requestFailedDurationTSO.Observe(duration) - if err == io.EOF { - err = errs.ErrClientTSOStreamClosed - } else { - err = errors.WithStack(err) - } - return + return tsoRequestResult{}, err } - requestDurationTSO.Observe(duration) - tsoBatchSize.Observe(float64(count)) - - if resp.GetCount() != uint32(count) { - err = errors.WithStack(errTSOLength) - return - } - - ts := resp.GetTimestamp() - respKeyspaceGroupID = defaultKeySpaceGroupID - physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() - return + return tsoRequestResult{ + physical: resp.GetTimestamp().GetPhysical(), + logical: resp.GetTimestamp().GetLogical(), + count: resp.GetCount(), + suffixBits: resp.GetTimestamp().GetSuffixBits(), + respKeyspaceGroupID: defaultKeySpaceGroupID, + }, nil } -type tsoTSOStream struct { - serverURL string - stream tsopb.TSO_TsoClient -} - -func (s *tsoTSOStream) getServerURL() string { - return s.serverURL +type tsoTSOStreamAdapter struct { + stream tsopb.TSO_TsoClient } -func (s *tsoTSOStream) processRequests( - clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, - count int64, batchStartTime time.Time, -) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { - start := time.Now() +func (s tsoTSOStreamAdapter) Send(clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64) error { req := &tsopb.TsoRequest{ Header: &tsopb.RequestHeader{ ClusterId: clusterID, @@ -188,8 +155,54 @@ func (s *tsoTSOStream) processRequests( Count: uint32(count), DcLocation: dcLocation, } + return s.stream.Send(req) +} + +func (s tsoTSOStreamAdapter) Recv() (tsoRequestResult, error) { + resp, err := s.stream.Recv() + if err != nil { + return tsoRequestResult{}, err + } + return tsoRequestResult{ + physical: resp.GetTimestamp().GetPhysical(), + logical: resp.GetTimestamp().GetLogical(), + count: resp.GetCount(), + suffixBits: resp.GetTimestamp().GetSuffixBits(), + respKeyspaceGroupID: resp.GetHeader().GetKeyspaceGroupId(), + }, nil +} + +//// TSO Stream +// +//type tsoStream interface { +// getServerURL() string +// //// processRequests processes TSO requests in streaming mode to get timestamps +// //processRequests( +// // clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, +// // count int64, batchStartTime time.Time, +// //) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) +// processRequestsAsync( +// clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, +// count int64, batchStartTime time.Time, resultCh chan<- tsoRequestResult, +// ) +// +// Close() +//} + +type tsoStream struct { + serverURL string + stream grpcTSOStreamAdapter +} + +func (s *tsoStream) getServerURL() string { + return s.serverURL +} - if err = s.stream.Send(req); err != nil { +func (s *tsoStream) processRequests( + clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, count int64, batchStartTime time.Time, +) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { + start := time.Now() + if err = s.stream.Send(clusterID, keyspaceID, keyspaceGroupID, dcLocation, count); err != nil { if err == io.EOF { err = errs.ErrClientTSOStreamClosed } else { @@ -198,7 +211,7 @@ func (s *tsoTSOStream) processRequests( return } tsoBatchSendLatency.Observe(time.Since(batchStartTime).Seconds()) - resp, err := s.stream.Recv() + res, err := s.stream.Recv() duration := time.Since(start).Seconds() if err != nil { requestFailedDurationTSO.Observe(duration) @@ -212,13 +225,122 @@ func (s *tsoTSOStream) processRequests( requestDurationTSO.Observe(duration) tsoBatchSize.Observe(float64(count)) - if resp.GetCount() != uint32(count) { + if res.count != uint32(count) { err = errors.WithStack(errTSOLength) return } - ts := resp.GetTimestamp() - respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId() - physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() + respKeyspaceGroupID = res.respKeyspaceGroupID + physical, logical, suffixBits = res.physical, res.logical, res.suffixBits return } + +// +//type pdTSOStream struct { +// serverURL string +// stream pdpb.PD_TsoClient +//} +// +//func (s *pdTSOStream) getServerURL() string { +// return s.serverURL +//} +// +//func (s *pdTSOStream) processRequests( +// clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time, +//) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { +// start := time.Now() +// req := &pdpb.TsoRequest{ +// Header: &pdpb.RequestHeader{ +// ClusterId: clusterID, +// }, +// Count: uint32(count), +// DcLocation: dcLocation, +// } +// +// if err = s.stream.Send(req); err != nil { +// if err == io.EOF { +// err = errs.ErrClientTSOStreamClosed +// } else { +// err = errors.WithStack(err) +// } +// return +// } +// tsoBatchSendLatency.Observe(float64(time.Since(batchStartTime))) +// resp, err := s.stream.Recv() +// if err != nil { +// if err == io.EOF { +// err = errs.ErrClientTSOStreamClosed +// } else { +// err = errors.WithStack(err) +// } +// return +// } +// requestDurationTSO.Observe(time.Since(start).Seconds()) +// tsoBatchSize.Observe(float64(count)) +// +// if resp.GetCount() != uint32(count) { +// err = errors.WithStack(errTSOLength) +// return +// } +// +// ts := resp.GetTimestamp() +// respKeyspaceGroupID = defaultKeySpaceGroupID +// physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() +// return +//} +// +//type tsoTSOStream struct { +// serverURL string +// stream tsopb.TSO_TsoClient +//} +// +//func (s *tsoTSOStream) getServerURL() string { +// return s.serverURL +//} +// +//func (s *tsoTSOStream) processRequests( +// clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, +// count int64, batchStartTime time.Time, +//) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { +// start := time.Now() +// req := &tsopb.TsoRequest{ +// Header: &tsopb.RequestHeader{ +// ClusterId: clusterID, +// KeyspaceId: keyspaceID, +// KeyspaceGroupId: keyspaceGroupID, +// }, +// Count: uint32(count), +// DcLocation: dcLocation, +// } +// +// if err = s.stream.Send(req); err != nil { +// if err == io.EOF { +// err = errs.ErrClientTSOStreamClosed +// } else { +// err = errors.WithStack(err) +// } +// return +// } +// tsoBatchSendLatency.Observe(float64(time.Since(batchStartTime))) +// resp, err := s.stream.Recv() +// if err != nil { +// if err == io.EOF { +// err = errs.ErrClientTSOStreamClosed +// } else { +// err = errors.WithStack(err) +// } +// return +// } +// requestDurationTSO.Observe(time.Since(start).Seconds()) +// tsoBatchSize.Observe(float64(count)) +// +// if resp.GetCount() != uint32(count) { +// err = errors.WithStack(errTSOLength) +// return +// } +// +// ts := resp.GetTimestamp() +// respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId() +// physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() +// return +//} From 8dfa046cec1dd68ca8d40272f672a75e539957a3 Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Tue, 23 Jul 2024 17:52:23 +0800 Subject: [PATCH 2/3] Remove commented code Signed-off-by: MyonKeminta --- client/tso_stream.go | 127 ------------------------------------------- 1 file changed, 127 deletions(-) diff --git a/client/tso_stream.go b/client/tso_stream.go index 4faf081c25d..23cba95446e 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -172,23 +172,6 @@ func (s tsoTSOStreamAdapter) Recv() (tsoRequestResult, error) { }, nil } -//// TSO Stream -// -//type tsoStream interface { -// getServerURL() string -// //// processRequests processes TSO requests in streaming mode to get timestamps -// //processRequests( -// // clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, -// // count int64, batchStartTime time.Time, -// //) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) -// processRequestsAsync( -// clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, -// count int64, batchStartTime time.Time, resultCh chan<- tsoRequestResult, -// ) -// -// Close() -//} - type tsoStream struct { serverURL string stream grpcTSOStreamAdapter @@ -234,113 +217,3 @@ func (s *tsoStream) processRequests( physical, logical, suffixBits = res.physical, res.logical, res.suffixBits return } - -// -//type pdTSOStream struct { -// serverURL string -// stream pdpb.PD_TsoClient -//} -// -//func (s *pdTSOStream) getServerURL() string { -// return s.serverURL -//} -// -//func (s *pdTSOStream) processRequests( -// clusterID uint64, _, _ uint32, dcLocation string, count int64, batchStartTime time.Time, -//) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { -// start := time.Now() -// req := &pdpb.TsoRequest{ -// Header: &pdpb.RequestHeader{ -// ClusterId: clusterID, -// }, -// Count: uint32(count), -// DcLocation: dcLocation, -// } -// -// if err = s.stream.Send(req); err != nil { -// if err == io.EOF { -// err = errs.ErrClientTSOStreamClosed -// } else { -// err = errors.WithStack(err) -// } -// return -// } -// tsoBatchSendLatency.Observe(float64(time.Since(batchStartTime))) -// resp, err := s.stream.Recv() -// if err != nil { -// if err == io.EOF { -// err = errs.ErrClientTSOStreamClosed -// } else { -// err = errors.WithStack(err) -// } -// return -// } -// requestDurationTSO.Observe(time.Since(start).Seconds()) -// tsoBatchSize.Observe(float64(count)) -// -// if resp.GetCount() != uint32(count) { -// err = errors.WithStack(errTSOLength) -// return -// } -// -// ts := resp.GetTimestamp() -// respKeyspaceGroupID = defaultKeySpaceGroupID -// physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() -// return -//} -// -//type tsoTSOStream struct { -// serverURL string -// stream tsopb.TSO_TsoClient -//} -// -//func (s *tsoTSOStream) getServerURL() string { -// return s.serverURL -//} -// -//func (s *tsoTSOStream) processRequests( -// clusterID uint64, keyspaceID, keyspaceGroupID uint32, dcLocation string, -// count int64, batchStartTime time.Time, -//) (respKeyspaceGroupID uint32, physical, logical int64, suffixBits uint32, err error) { -// start := time.Now() -// req := &tsopb.TsoRequest{ -// Header: &tsopb.RequestHeader{ -// ClusterId: clusterID, -// KeyspaceId: keyspaceID, -// KeyspaceGroupId: keyspaceGroupID, -// }, -// Count: uint32(count), -// DcLocation: dcLocation, -// } -// -// if err = s.stream.Send(req); err != nil { -// if err == io.EOF { -// err = errs.ErrClientTSOStreamClosed -// } else { -// err = errors.WithStack(err) -// } -// return -// } -// tsoBatchSendLatency.Observe(float64(time.Since(batchStartTime))) -// resp, err := s.stream.Recv() -// if err != nil { -// if err == io.EOF { -// err = errs.ErrClientTSOStreamClosed -// } else { -// err = errors.WithStack(err) -// } -// return -// } -// requestDurationTSO.Observe(time.Since(start).Seconds()) -// tsoBatchSize.Observe(float64(count)) -// -// if resp.GetCount() != uint32(count) { -// err = errors.WithStack(errTSOLength) -// return -// } -// -// ts := resp.GetTimestamp() -// respKeyspaceGroupID = resp.GetHeader().GetKeyspaceGroupId() -// physical, logical, suffixBits = ts.GetPhysical(), ts.GetLogical(), ts.GetSuffixBits() -// return -//} From 655a55779e758cfe5b01374e5d9f16ddf951985d Mon Sep 17 00:00:00 2001 From: MyonKeminta Date: Thu, 25 Jul 2024 17:47:12 +0800 Subject: [PATCH 3/3] Adjust comments Signed-off-by: MyonKeminta --- client/tso_client.go | 2 -- client/tso_stream.go | 5 ++++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/client/tso_client.go b/client/tso_client.go index 4276f46ccc8..2f3b949f017 100644 --- a/client/tso_client.go +++ b/client/tso_client.go @@ -350,8 +350,6 @@ type tsoConnectionContext struct { // Current URL of the stream connection. streamURL string // Current stream to send gRPC requests. - // - `pdpb.PD_TsoClient` for a leader/follower in the PD cluster. - // - `tsopb.TSO_TsoClient` for a primary/secondary in the TSO cluster. stream *tsoStream } diff --git a/client/tso_stream.go b/client/tso_stream.go index 23cba95446e..da9cab95ba0 100644 --- a/client/tso_stream.go +++ b/client/tso_stream.go @@ -174,7 +174,10 @@ func (s tsoTSOStreamAdapter) Recv() (tsoRequestResult, error) { type tsoStream struct { serverURL string - stream grpcTSOStreamAdapter + // The internal gRPC stream. + // - `pdpb.PD_TsoClient` for a leader/follower in the PD cluster. + // - `tsopb.TSO_TsoClient` for a primary/secondary in the TSO cluster. + stream grpcTSOStreamAdapter } func (s *tsoStream) getServerURL() string {