Skip to content

Commit 1c16548

Browse files
authored
server/handler: register missing http handlers related to ingest param (#63099) (#63485)
ref #61553
1 parent c4c113b commit 1c16548

File tree

4 files changed

+77
-15
lines changed

4 files changed

+77
-15
lines changed

pkg/lightning/backend/local/rate_limiter_param.go

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -46,33 +46,29 @@ var (
4646
// InitializeRateLimiterParam initializes the rate limiter params.
4747
func InitializeRateLimiterParam(m *meta.Mutator, logger *zap.Logger) error {
4848
err := initializeVariables(
49-
m.GetIngestMaxBatchSplitRanges,
50-
defaultMaxBatchSplitRanges,
51-
&CurrentMaxBatchSplitRanges,
49+
m.GetIngestMaxBatchSplitRanges, m.SetIngestMaxBatchSplitRanges,
50+
defaultMaxBatchSplitRanges, &CurrentMaxBatchSplitRanges,
5251
logger, "maxBatchSplitRanges")
5352
if err != nil {
5453
return err
5554
}
5655
err = initializeVariables(
57-
m.GetIngestMaxSplitRangesPerSec,
58-
defaultSplitRangesPerSec,
59-
&CurrentMaxSplitRangesPerSec,
56+
m.GetIngestMaxSplitRangesPerSec, m.SetIngestMaxSplitRangesPerSec,
57+
defaultSplitRangesPerSec, &CurrentMaxSplitRangesPerSec,
6058
logger, "maxSplitRangesPerSec")
6159
if err != nil {
6260
return err
6361
}
6462
err = initializeVariables(
65-
m.GetIngestMaxInflight,
66-
defaultMaxIngestInflight,
67-
&CurrentMaxIngestInflight,
63+
m.GetIngestMaxInflight, m.SetIngestMaxInflight,
64+
defaultMaxIngestInflight, &CurrentMaxIngestInflight,
6865
logger, "maxIngestInflight")
6966
if err != nil {
7067
return err
7168
}
7269
err = initializeVariables(
73-
m.GetIngestMaxPerSec,
74-
defaultMaxIngestPerSec,
75-
&CurrentMaxIngestPerSec,
70+
m.GetIngestMaxPerSec, m.SetIngestMaxPerSec,
71+
defaultMaxIngestPerSec, &CurrentMaxIngestPerSec,
7672
logger, "maxIngestPerSec")
7773
if err != nil {
7874
return err
@@ -82,6 +78,7 @@ func InitializeRateLimiterParam(m *meta.Mutator, logger *zap.Logger) error {
8278

8379
func initializeVariables[T comparable](
8480
metaGetter func() (v T, isNull bool, err error),
81+
metaSetter func(v T) error,
8582
defaultVal T,
8683
globalVar *atomic.Pointer[T],
8784
logger *zap.Logger,
@@ -92,11 +89,17 @@ func initializeVariables[T comparable](
9289
return errors.Annotatef(err, "failed to read %s value from meta store", varName)
9390
}
9491
var zero T
95-
if isNull || val == zero {
92+
if isNull {
93+
err = metaSetter(defaultVal)
94+
if err != nil {
95+
return errors.Annotatef(err, "failed to set %s value to meta store", varName)
96+
}
9697
val = defaultVal
9798
logger.Info("meta kv not found in meta store, initialized to default and persisted",
9899
zap.String("key", varName),
99100
zap.Any("value", defaultVal))
101+
} else if val == zero {
102+
val = defaultVal
100103
} else {
101104
logger.Info("loaded value from meta store",
102105
zap.String("key", varName),

pkg/server/handler/tests/BUILD.bazel

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,7 @@ go_test(
99
"main_test.go",
1010
],
1111
flaky = True,
12-
race = "on",
13-
shard_count = 39,
12+
shard_count = 40,
1413
deps = [
1514
"//pkg/config",
1615
"//pkg/ddl",

pkg/server/handler/tests/http_handler_serial_test.go

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -716,3 +716,53 @@ func TestTTL(t *testing.T) {
716716
require.Nil(t, obj)
717717
require.EqualError(t, err, "http status: 400 Bad Request, table test_ttl.t2 not exists")
718718
}
719+
720+
func TestIngestParam(t *testing.T) {
721+
ts := createBasicHTTPHandlerTestSuite()
722+
ts.startServer(t)
723+
defer ts.stopServer(t)
724+
725+
testCases := []struct {
726+
url string
727+
defaultVal any
728+
modVal any
729+
expectedVal any
730+
}{
731+
{"/ingest/max-batch-split-ranges", float64(2048), 1000, float64(1000)},
732+
{"/ingest/max-split-ranges-per-sec", float64(0), 2000, float64(2000)},
733+
{"/ingest/max-ingest-inflight", float64(0), 1000, float64(1000)},
734+
{"/ingest/max-ingest-per-sec", float64(0), 2000, float64(2000)},
735+
}
736+
737+
for _, tc := range testCases {
738+
t.Run(tc.url, func(t *testing.T) {
739+
resp, err := ts.FetchStatus(tc.url)
740+
require.NoError(t, err)
741+
defer func() { require.NoError(t, resp.Body.Close()) }()
742+
require.Equal(t, http.StatusOK, resp.StatusCode)
743+
decoder := json.NewDecoder(resp.Body)
744+
var payload struct {
745+
Value float64 `json:"value"`
746+
IsNull bool `json:"is_null"`
747+
}
748+
err = decoder.Decode(&payload)
749+
require.NoError(t, err)
750+
require.Equal(t, tc.defaultVal, payload.Value)
751+
752+
resp, err = ts.PostStatus(tc.url, "", bytes.NewBuffer([]byte(fmt.Sprintf(`{"value": %v}`, tc.modVal))))
753+
require.NoError(t, err)
754+
require.NotNil(t, resp)
755+
defer func() { require.NoError(t, resp.Body.Close()) }()
756+
require.Equal(t, http.StatusOK, resp.StatusCode)
757+
758+
resp, err = ts.FetchStatus(tc.url)
759+
require.NoError(t, err)
760+
defer func() { require.NoError(t, resp.Body.Close()) }()
761+
require.Equal(t, http.StatusOK, resp.StatusCode)
762+
decoder = json.NewDecoder(resp.Body)
763+
err = decoder.Decode(&payload)
764+
require.NoError(t, err)
765+
require.Equal(t, tc.expectedVal, payload.Value)
766+
})
767+
}
768+
}

pkg/server/http_status.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -259,6 +259,16 @@ func (s *Server) startHTTPServer() {
259259
// HTTP path for upgrade operations.
260260
router.Handle("/upgrade/{op}", handler.NewClusterUpgradeHandler(tikvHandlerTool.Store.(kv.Storage))).Name("upgrade operations")
261261

262+
// HTTP path for ingest configurations
263+
router.Handle("/ingest/max-batch-split-ranges", tikvhandler.NewIngestConcurrencyHandler(
264+
tikvHandlerTool, tikvhandler.IngestParamMaxBatchSplitRanges)).Name("IngestMaxBatchSplitRanges")
265+
router.Handle("/ingest/max-split-ranges-per-sec", tikvhandler.NewIngestConcurrencyHandler(
266+
tikvHandlerTool, tikvhandler.IngestParamMaxSplitRangesPerSec)).Name("IngestMaxSplitRangesPerSec")
267+
router.Handle("/ingest/max-ingest-inflight", tikvhandler.NewIngestConcurrencyHandler(
268+
tikvHandlerTool, tikvhandler.IngestParamMaxInflight)).Name("IngestMaxInflight")
269+
router.Handle("/ingest/max-ingest-per-sec", tikvhandler.NewIngestConcurrencyHandler(
270+
tikvHandlerTool, tikvhandler.IngestParamMaxPerSecond)).Name("IngestMaxPerSec")
271+
262272
if s.cfg.Store == "tikv" {
263273
// HTTP path for tikv.
264274
router.Handle("/tables/{db}/{table}/regions", tikvhandler.NewTableHandler(tikvHandlerTool, tikvhandler.OpTableRegions))

0 commit comments

Comments
 (0)