Skip to content

Commit

Permalink
client: Support parallel TSO RPC requests on single dispatcher loop (#…
Browse files Browse the repository at this point in the history
…8633)

close #8432

client: Support parallel TSO RPC requests on single dispatcher loop

This commit supports handling multiple TSO RPC concurrently in one single dispatcher loop to reduce the expected time that each GetTS call spent on waiting the next batch.

Signed-off-by: MyonKeminta <MyonKeminta@users.noreply.github.com>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
MyonKeminta and ti-chi-bot[bot] authored Sep 26, 2024
1 parent 7907679 commit 642f0e9
Show file tree
Hide file tree
Showing 9 changed files with 828 additions and 50 deletions.
6 changes: 6 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -797,6 +797,12 @@ func (c *client) UpdateOption(option DynamicOption, value any) error {
return errors.New("[pd] invalid value type for EnableFollowerHandle option, it should be bool")
}
c.option.setEnableFollowerHandle(enable)
case TSOClientRPCConcurrency:
value, ok := value.(int)
if !ok {
return errors.New("[pd] invalid value type for TSOClientRPCConcurrency option, it should be int")
}
c.option.setTSOClientRPCConcurrency(value)
default:
return errors.New("[pd] unsupported client option")
}
Expand Down
10 changes: 10 additions & 0 deletions client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ var (
tsoBatchSendLatency prometheus.Histogram
requestForwarded *prometheus.GaugeVec
ongoingRequestCountGauge *prometheus.GaugeVec
estimateTSOLatencyGauge *prometheus.GaugeVec
)

func initMetrics(constLabels prometheus.Labels) {
Expand Down Expand Up @@ -127,6 +128,14 @@ func initMetrics(constLabels prometheus.Labels) {
Help: "Current count of ongoing batch tso requests",
ConstLabels: constLabels,
}, []string{"stream"})
estimateTSOLatencyGauge = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Namespace: "pd_client",
Subsystem: "request",
Name: "estimate_tso_latency",
Help: "Estimated latency of an RTT of getting TSO",
ConstLabels: constLabels,
}, []string{"stream"})
}

var (
Expand Down Expand Up @@ -236,4 +245,5 @@ func registerMetrics() {
prometheus.MustRegister(tsoBatchSize)
prometheus.MustRegister(tsoBatchSendLatency)
prometheus.MustRegister(requestForwarded)
prometheus.MustRegister(estimateTSOLatencyGauge)
}
15 changes: 15 additions & 0 deletions client/option.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const (
defaultMaxTSOBatchWaitInterval time.Duration = 0
defaultEnableTSOFollowerProxy = false
defaultEnableFollowerHandle = false
defaultTSOClientRPCConcurrency = 1
)

// DynamicOption is used to distinguish the dynamic option type.
Expand All @@ -43,6 +44,8 @@ const (
EnableTSOFollowerProxy
// EnableFollowerHandle is the follower handle option.
EnableFollowerHandle
// TSOClientRPCConcurrency controls the amount of ongoing TSO RPC requests at the same time in a single TSO client.
TSOClientRPCConcurrency

dynamicOptionCount
)
Expand Down Expand Up @@ -77,6 +80,7 @@ func newOption() *option {
co.dynamicOptions[MaxTSOBatchWaitInterval].Store(defaultMaxTSOBatchWaitInterval)
co.dynamicOptions[EnableTSOFollowerProxy].Store(defaultEnableTSOFollowerProxy)
co.dynamicOptions[EnableFollowerHandle].Store(defaultEnableFollowerHandle)
co.dynamicOptions[TSOClientRPCConcurrency].Store(defaultTSOClientRPCConcurrency)
return co
}

Expand Down Expand Up @@ -127,3 +131,14 @@ func (o *option) setEnableTSOFollowerProxy(enable bool) {
func (o *option) getEnableTSOFollowerProxy() bool {
return o.dynamicOptions[EnableTSOFollowerProxy].Load().(bool)
}

func (o *option) setTSOClientRPCConcurrency(value int) {
old := o.getTSOClientRPCConcurrency()
if value != old {
o.dynamicOptions[TSOClientRPCConcurrency].Store(value)
}
}

func (o *option) getTSOClientRPCConcurrency() int {
return o.dynamicOptions[TSOClientRPCConcurrency].Load().(int)
}
42 changes: 42 additions & 0 deletions client/tso_batch_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,17 @@ func (tbc *tsoBatchController) fetchPendingRequests(ctx context.Context, tsoRequ
// TODO: `tbc.collectedRequestCount` should never be non-empty here. Consider do assertion here.
tbc.collectedRequestCount = 0
for {
// If the batch size reaches the maxBatchSize limit but the token haven't arrived yet, don't receive more
// requests, and return when token is ready.
if tbc.collectedRequestCount >= tbc.maxBatchSize && !tokenAcquired {
select {
case <-ctx.Done():
return ctx.Err()
case <-tokenCh:
return nil
}
}

select {
case <-ctx.Done():
return ctx.Err()
Expand Down Expand Up @@ -146,6 +157,37 @@ fetchPendingRequestsLoop:
return nil
}

// fetchRequestsWithTimer tries to fetch requests until the given timer ticks. The caller must set the timer properly
// before calling this function.
func (tbc *tsoBatchController) fetchRequestsWithTimer(ctx context.Context, tsoRequestCh <-chan *tsoRequest, timer *time.Timer) error {
batchingLoop:
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case <-ctx.Done():
return ctx.Err()
case req := <-tsoRequestCh:
tbc.pushRequest(req)
case <-timer.C:
break batchingLoop
}
}

// Try to collect more requests in non-blocking way.
nonWaitingBatchLoop:
for tbc.collectedRequestCount < tbc.maxBatchSize {
select {
case <-ctx.Done():
return ctx.Err()
case req := <-tsoRequestCh:
tbc.pushRequest(req)
default:
break nonWaitingBatchLoop
}
}

return nil
}

func (tbc *tsoBatchController) pushRequest(tsoReq *tsoRequest) {
tbc.collectedRequests[tbc.collectedRequestCount] = tsoReq
tbc.collectedRequestCount++
Expand Down
Loading

0 comments on commit 642f0e9

Please sign in to comment.