Skip to content

Commit

Permalink
*: init ctx for extractWorker (#55228)
Browse files Browse the repository at this point in the history
close #55217
  • Loading branch information
lance6716 authored Aug 6, 2024
1 parent 1cd11bc commit d940b7d
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 14 deletions.
2 changes: 1 addition & 1 deletion pkg/domain/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -2088,7 +2088,7 @@ func (do *Domain) SetupDumpFileGCChecker(ctx sessionctx.Context) {

// SetupExtractHandle setups extract handler
func (do *Domain) SetupExtractHandle(sctxs []sessionctx.Context) {
do.extractTaskHandle = NewExtractHandler(sctxs)
do.extractTaskHandle = newExtractHandler(do.ctx, sctxs)
}

var planReplayerHandleLease atomic.Uint64
Expand Down
13 changes: 9 additions & 4 deletions pkg/domain/extract.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ type ExtractHandle struct {
worker *extractWorker
}

// NewExtractHandler new extract handler
func NewExtractHandler(sctxs []sessionctx.Context) *ExtractHandle {
// newExtractHandler new extract handler
func newExtractHandler(ctx context.Context, sctxs []sessionctx.Context) *ExtractHandle {
h := &ExtractHandle{}
h.worker = newExtractWorker(sctxs[0], false)
h.worker = newExtractWorker(ctx, sctxs[0], false)
return h
}

Expand Down Expand Up @@ -121,8 +121,13 @@ func NewExtractPlanTask(begin, end time.Time) *ExtractTask {
}
}

func newExtractWorker(sctx sessionctx.Context, isBackgroundWorker bool) *extractWorker {
func newExtractWorker(
ctx context.Context,
sctx sessionctx.Context,
isBackgroundWorker bool,
) *extractWorker {
return &extractWorker{
ctx: ctx,
sctx: sctx,
isBackgroundWorker: isBackgroundWorker,
}
Expand Down
5 changes: 5 additions & 0 deletions pkg/infoschema/sieve.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"context"
"sync"

"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/infoschema/internal"
)

Expand Down Expand Up @@ -150,6 +151,10 @@ func (s *Sieve[K, V]) Set(key K, value V) {
}

func (s *Sieve[K, V]) Get(key K) (value V, ok bool) {
failpoint.Inject("skipGet", func() {
var v V
failpoint.Return(v, false)
})
s.mu.Lock()
defer s.mu.Unlock()
if e, ok := s.items[key]; ok {
Expand Down
3 changes: 2 additions & 1 deletion pkg/server/handler/extractorhandler/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_test(
":extractorhandler",
"//pkg/config",
"//pkg/domain",
"//pkg/kv",
"//pkg/metrics",
"//pkg/server",
"//pkg/server/internal/testserverclient",
Expand All @@ -36,12 +37,12 @@ go_test(
"//pkg/session",
"//pkg/store/mockstore/unistore",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/testkit/testsetup",
"//pkg/types",
"//pkg/util/stmtsummary/v2:stmtsummary",
"//pkg/util/topsql/state",
"@com_github_gorilla_mux//:mux",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_client_go_v2//tikv",
"@org_uber_go_goleak//:goleak",
Expand Down
27 changes: 22 additions & 5 deletions pkg/server/handler/extractorhandler/extract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,16 +24,17 @@ import (
"time"

"github.com/gorilla/mux"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/config"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/kv"
server2 "github.com/pingcap/tidb/pkg/server"
"github.com/pingcap/tidb/pkg/server/handler/extractorhandler"
"github.com/pingcap/tidb/pkg/server/internal/testserverclient"
"github.com/pingcap/tidb/pkg/server/internal/testutil"
"github.com/pingcap/tidb/pkg/server/internal/util"
"github.com/pingcap/tidb/pkg/session"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/types"
stmtsummaryv2 "github.com/pingcap/tidb/pkg/util/stmtsummary/v2"
"github.com/stretchr/testify/require"
Expand All @@ -44,7 +45,23 @@ func TestExtractHandler(t *testing.T) {
defer closeStmtSummary()

store := testkit.CreateMockStore(t)
testExtractHandler(t, store)
}

func TestExtractHandlerInfoSchemaV2(t *testing.T) {
setupStmtSummary()
defer closeStmtSummary()

store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_schema_cache_size = 600*1024*1024")
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/infoschema/skipGet", `return()`)

testExtractHandler(t, store)
}

func testExtractHandler(t *testing.T, store kv.Storage) {
server2.RunInGoTestChan = make(chan struct{})
driver := server2.NewTiDBDriver(store)
client := testserverclient.NewTestServerClient()
cfg := util.NewTestConfig()
Expand All @@ -59,11 +76,14 @@ func TestExtractHandler(t *testing.T) {
dom, err := session.GetDomain(store)
require.NoError(t, err)
server.SetDomain(dom)
tk := testkit.NewTestKit(t, store)
tk.MustExec("set @@global.tidb_schema_cache_size = 600*1024*1024")
go func() {
err := server.Run(nil)
require.NoError(t, err)
}()
<-server2.RunInGoTestChan

client.Port = testutil.GetPortFromTCPAddr(server.ListenAddr())
client.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr())
client.WaitUntilServerOnline()
Expand All @@ -75,10 +95,7 @@ func TestExtractHandler(t *testing.T) {
eh := &extractorhandler.ExtractTaskServeHandler{ExtractHandler: dom.GetExtractHandle()}
router := mux.NewRouter()
router.Handle("/extract_task/dump", eh)
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/server/extractTaskServeHandler", `return(true)`))
defer func() {
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/server/extractTaskServeHandler"))
}()
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/server/extractTaskServeHandler", `return(true)`)
resp0, err := client.FetchStatus(fmt.Sprintf("/extract_task/dump?type=plan&begin=%s&end=%s",
url.QueryEscape(startTime.Format(types.TimeFormat)), url.QueryEscape(endTime.Format(types.TimeFormat))))
defer os.RemoveAll(domain.GetExtractTaskDirName())
Expand Down
15 changes: 12 additions & 3 deletions pkg/sessionctx/variable/varsutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,12 +590,21 @@ func ParseAnalyzeSkipColumnTypes(val string) map[string]struct{} {
return skipTypes
}

var (
// SchemaCacheSizeLowerBound will adjust the schema cache size to this value if
// it is lower than this value.
SchemaCacheSizeLowerBound uint64 = 512 * units.MiB
// SchemaCacheSizeLowerBoundStr is the string representation of
// SchemaCacheSizeLowerBound.
SchemaCacheSizeLowerBoundStr = "512MB"
)

func parseSchemaCacheSize(s *SessionVars, normalizedValue string, originalValue string) (byteSize uint64, normalizedStr string, err error) {
defer func() {
if err == nil && byteSize > 0 && byteSize < (512*units.MiB) {
if err == nil && byteSize > 0 && byteSize < SchemaCacheSizeLowerBound {
s.StmtCtx.AppendWarning(ErrTruncatedWrongValue.FastGenByArgs(TiDBSchemaCacheSize, originalValue))
byteSize = 512 * units.MiB
normalizedStr = "512MB"
byteSize = SchemaCacheSizeLowerBound
normalizedStr = SchemaCacheSizeLowerBoundStr
}
if err == nil && byteSize > math.MaxInt64 {
s.StmtCtx.AppendWarning(ErrTruncatedWrongValue.FastGenByArgs(TiDBSchemaCacheSize, originalValue))
Expand Down

0 comments on commit d940b7d

Please sign in to comment.