From ec9aedaf5eb3a6931595b32dcdf6879dbb620d5a Mon Sep 17 00:00:00 2001 From: MyonKeminta <9948422+MyonKeminta@users.noreply.github.com> Date: Thu, 19 Dec 2024 19:08:12 +0800 Subject: [PATCH] *: Update client-go and verify all read ts (#58054) ref pingcap/tidb#57786 --- DEPS.bzl | 120 ++++++++++---------- go.mod | 2 + go.sum | 4 +- pkg/ddl/column_change_test.go | 31 +++-- pkg/ddl/column_test.go | 47 +++++--- pkg/ddl/db_integration_test.go | 22 ++-- pkg/ddl/index_change_test.go | 4 +- pkg/ddl/job_worker_test.go | 4 +- pkg/executor/set.go | 3 +- pkg/executor/test/executor/executor_test.go | 3 +- pkg/executor/test/writetest/BUILD.bazel | 1 - pkg/executor/test/writetest/write_test.go | 7 +- pkg/planner/core/planbuilder.go | 4 +- pkg/sessionctx/context.go | 9 +- pkg/sessiontxn/staleread/processor.go | 2 +- pkg/sessiontxn/staleread/util.go | 2 +- pkg/store/copr/BUILD.bazel | 1 + pkg/store/copr/batch_coprocessor.go | 2 +- pkg/store/copr/batch_request_sender.go | 5 +- pkg/store/copr/mpp.go | 2 +- pkg/util/mock/BUILD.bazel | 1 + pkg/util/mock/context.go | 73 +++++++----- pkg/util/mock/fortest.go | 2 + 23 files changed, 205 insertions(+), 146 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 259062a72aec1..8623cf07e7982 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -2855,13 +2855,13 @@ def go_deps(): name = "com_github_golang_jwt_jwt_v4", build_file_proto_mode = "disable_global", importpath = "github.com/golang-jwt/jwt/v4", - sha256 = "a05e4849f6b52d84154e9bc37fca7f340bb85d9cce2ce180a09ae70758f6890c", - strip_prefix = "github.com/golang-jwt/jwt/v4@v4.5.1", + sha256 = "fdb3b9c078eba9a2bd437c1b3acdf98ee09d276121a97b4bc7f6d870eb5ff75b", + strip_prefix = "github.com/golang-jwt/jwt/v4@v4.5.0", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/golang-jwt/jwt/v4/com_github_golang_jwt_jwt_v4-v4.5.1.zip", - "http://ats.apps.svc/gomod/github.com/golang-jwt/jwt/v4/com_github_golang_jwt_jwt_v4-v4.5.1.zip", - "https://cache.hawkingrei.com/gomod/github.com/golang-jwt/jwt/v4/com_github_golang_jwt_jwt_v4-v4.5.1.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/golang-jwt/jwt/v4/com_github_golang_jwt_jwt_v4-v4.5.1.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/golang-jwt/jwt/v4/com_github_golang_jwt_jwt_v4-v4.5.0.zip", + "http://ats.apps.svc/gomod/github.com/golang-jwt/jwt/v4/com_github_golang_jwt_jwt_v4-v4.5.0.zip", + "https://cache.hawkingrei.com/gomod/github.com/golang-jwt/jwt/v4/com_github_golang_jwt_jwt_v4-v4.5.0.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/golang-jwt/jwt/v4/com_github_golang_jwt_jwt_v4-v4.5.0.zip", ], ) go_repository( @@ -5438,26 +5438,26 @@ def go_deps(): name = "com_github_onsi_ginkgo_v2", build_file_proto_mode = "disable_global", importpath = "github.com/onsi/ginkgo/v2", - sha256 = "4865aab6c56b0d29a93cfe56206b586f1c9f36fde5a66e85650576344861b7cc", - strip_prefix = "github.com/onsi/ginkgo/v2@v2.13.0", + sha256 = "f41e92baa52ec53d482603e4585c0906ca0c02e05004dca78a62bf1de88833ad", + strip_prefix = "github.com/onsi/ginkgo/v2@v2.9.4", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.13.0.zip", - "http://ats.apps.svc/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.13.0.zip", - "https://cache.hawkingrei.com/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.13.0.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.13.0.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.9.4.zip", + "http://ats.apps.svc/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.9.4.zip", + "https://cache.hawkingrei.com/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.9.4.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/onsi/ginkgo/v2/com_github_onsi_ginkgo_v2-v2.9.4.zip", ], ) go_repository( name = "com_github_onsi_gomega", build_file_proto_mode = "disable_global", importpath = "github.com/onsi/gomega", - sha256 = "923e8d0a1f95b3989f31c45142dee0b80a0aaa00cfa210bbd4d059f7046d12a8", - strip_prefix = "github.com/onsi/gomega@v1.29.0", + sha256 = "ea2b22782cc15569645dfdfc066a651e1335626677ad92d7ba4358a0885bf369", + strip_prefix = "github.com/onsi/gomega@v1.20.1", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.29.0.zip", - "http://ats.apps.svc/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.29.0.zip", - "https://cache.hawkingrei.com/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.29.0.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.29.0.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.20.1.zip", + "http://ats.apps.svc/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.20.1.zip", + "https://cache.hawkingrei.com/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.20.1.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/onsi/gomega/com_github_onsi_gomega-v1.20.1.zip", ], ) go_repository( @@ -5815,13 +5815,13 @@ def go_deps(): name = "com_github_pingcap_tipb", build_file_proto_mode = "disable_global", importpath = "github.com/pingcap/tipb", - sha256 = "1b707429b5b938a05b250b5770be2a6aa243d6a4983d23b01bbca164e86b3e3c", - strip_prefix = "github.com/pingcap/tipb@v0.0.0-20241022082558-0607513e7fa4", + sha256 = "b39e154272ba36d145c6049947a012a76be740b32a44a46d7253caa145c56cc9", + strip_prefix = "github.com/pingcap/tipb@v0.0.0-20241008083645-0bcddae67837", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241022082558-0607513e7fa4.zip", - "http://ats.apps.svc/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241022082558-0607513e7fa4.zip", - "https://cache.hawkingrei.com/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241022082558-0607513e7fa4.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241022082558-0607513e7fa4.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241008083645-0bcddae67837.zip", + "http://ats.apps.svc/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241008083645-0bcddae67837.zip", + "https://cache.hawkingrei.com/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241008083645-0bcddae67837.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/pingcap/tipb/com_github_pingcap_tipb-v0.0.0-20241008083645-0bcddae67837.zip", ], ) go_repository( @@ -5958,13 +5958,13 @@ def go_deps(): name = "com_github_prometheus_client_golang", build_file_proto_mode = "disable_global", importpath = "github.com/prometheus/client_golang", - sha256 = "b76de10864f49c87a347b9a3e6fe606c1f93ed091de7d0d1d17a5967a60f5ce2", - strip_prefix = "github.com/prometheus/client_golang@v1.20.5", + sha256 = "cbd030de8f05e9b09cfe58890248629bc522f73a250ebacac26cb654dd88b709", + strip_prefix = "github.com/prometheus/client_golang@v1.20.4", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/prometheus/client_golang/com_github_prometheus_client_golang-v1.20.5.zip", - "http://ats.apps.svc/gomod/github.com/prometheus/client_golang/com_github_prometheus_client_golang-v1.20.5.zip", - "https://cache.hawkingrei.com/gomod/github.com/prometheus/client_golang/com_github_prometheus_client_golang-v1.20.5.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/prometheus/client_golang/com_github_prometheus_client_golang-v1.20.5.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/prometheus/client_golang/com_github_prometheus_client_golang-v1.20.4.zip", + "http://ats.apps.svc/gomod/github.com/prometheus/client_golang/com_github_prometheus_client_golang-v1.20.4.zip", + "https://cache.hawkingrei.com/gomod/github.com/prometheus/client_golang/com_github_prometheus_client_golang-v1.20.4.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/prometheus/client_golang/com_github_prometheus_client_golang-v1.20.4.zip", ], ) go_repository( @@ -6933,26 +6933,26 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sha256 = "cdcad188042c4d716dd9d4a304a2e36bc9d4edccaf86a19b85b1682f01df193c", - strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20241121061241-006dfb024c26", + sha256 = "2b016e2f29a1764f0a6ad3ad2267407549a28d68703f34acb2994ab774bc6263", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20241015065014-8dfa86b5d1db", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241121061241-006dfb024c26.zip", - "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241121061241-006dfb024c26.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241121061241-006dfb024c26.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241121061241-006dfb024c26.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241015065014-8dfa86b5d1db.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241015065014-8dfa86b5d1db.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241015065014-8dfa86b5d1db.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20241015065014-8dfa86b5d1db.zip", ], ) go_repository( name = "com_github_tikv_pd_client", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/pd/client", - sha256 = "52a62b6f6247ce31ee9d0a5dbde941ba3be3db74a713fd79643d015d98a15c5f", - strip_prefix = "github.com/tikv/pd/client@v0.0.0-20241111073742-238d4d79ea31", + sha256 = "ddfcb88a8b79c238b08c15c88f8142211cb9f64435c5ad371f682d8c81c8cc42", + strip_prefix = "github.com/tikv/pd/client@v0.0.0-20240926021936-642f0e919b0d", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip", - "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20241111073742-238d4d79ea31.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240926021936-642f0e919b0d.zip", + "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240926021936-642f0e919b0d.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240926021936-642f0e919b0d.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20240926021936-642f0e919b0d.zip", ], ) go_repository( @@ -9468,26 +9468,26 @@ def go_deps(): name = "io_k8s_api", build_file_proto_mode = "disable_global", importpath = "k8s.io/api", - sha256 = "ae7b519f36431bc55fa56c47a51c1c37cf9e0df7e9d23741b3e839426d2627ff", - strip_prefix = "k8s.io/api@v0.29.11", + sha256 = "2255428d2347df0b3a9cf6ac2791f5be6653b3c642359736e46733584d917335", + strip_prefix = "k8s.io/api@v0.28.6", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/k8s.io/api/io_k8s_api-v0.29.11.zip", - "http://ats.apps.svc/gomod/k8s.io/api/io_k8s_api-v0.29.11.zip", - "https://cache.hawkingrei.com/gomod/k8s.io/api/io_k8s_api-v0.29.11.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/k8s.io/api/io_k8s_api-v0.29.11.zip", + "http://bazel-cache.pingcap.net:8080/gomod/k8s.io/api/io_k8s_api-v0.28.6.zip", + "http://ats.apps.svc/gomod/k8s.io/api/io_k8s_api-v0.28.6.zip", + "https://cache.hawkingrei.com/gomod/k8s.io/api/io_k8s_api-v0.28.6.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/k8s.io/api/io_k8s_api-v0.28.6.zip", ], ) go_repository( name = "io_k8s_apimachinery", build_file_proto_mode = "disable_global", importpath = "k8s.io/apimachinery", - sha256 = "8dd5f53bf72f7bd6323bcc8f9f45823b30fc350daee4ab2d9e27cf1960d06b25", - strip_prefix = "k8s.io/apimachinery@v0.29.11", + sha256 = "efc7e38cb4662d0b6c5648772e1ae92040a4d03af0a3a7731aedf17f8eab7359", + strip_prefix = "k8s.io/apimachinery@v0.28.6", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.29.11.zip", - "http://ats.apps.svc/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.29.11.zip", - "https://cache.hawkingrei.com/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.29.11.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.29.11.zip", + "http://bazel-cache.pingcap.net:8080/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.28.6.zip", + "http://ats.apps.svc/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.28.6.zip", + "https://cache.hawkingrei.com/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.28.6.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/k8s.io/apimachinery/io_k8s_apimachinery-v0.28.6.zip", ], ) go_repository( @@ -10196,13 +10196,13 @@ def go_deps(): name = "org_golang_x_time", build_file_proto_mode = "disable_global", importpath = "golang.org/x/time", - sha256 = "ce9157f4961055bd942bbb02666ca5be9f92c92f8e64361aede9d1090df44464", - strip_prefix = "golang.org/x/time@v0.7.0", + sha256 = "e0e5812d19aed367f79ac0ae0ce4770b6602c85f5cfb8d59f3f573c7487ea516", + strip_prefix = "golang.org/x/time@v0.5.0", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/golang.org/x/time/org_golang_x_time-v0.7.0.zip", - "http://ats.apps.svc/gomod/golang.org/x/time/org_golang_x_time-v0.7.0.zip", - "https://cache.hawkingrei.com/gomod/golang.org/x/time/org_golang_x_time-v0.7.0.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/golang.org/x/time/org_golang_x_time-v0.7.0.zip", + "http://bazel-cache.pingcap.net:8080/gomod/golang.org/x/time/org_golang_x_time-v0.5.0.zip", + "http://ats.apps.svc/gomod/golang.org/x/time/org_golang_x_time-v0.5.0.zip", + "https://cache.hawkingrei.com/gomod/golang.org/x/time/org_golang_x_time-v0.5.0.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/golang.org/x/time/org_golang_x_time-v0.5.0.zip", ], ) go_repository( diff --git a/go.mod b/go.mod index a087a45626fcf..c7a9ac17b6f0e 100644 --- a/go.mod +++ b/go.mod @@ -327,3 +327,5 @@ replace ( sourcegraph.com/sourcegraph/appdash => github.com/sourcegraph/appdash v0.0.0-20190731080439-ebfcffb1b5c0 sourcegraph.com/sourcegraph/appdash-data => github.com/sourcegraph/appdash-data v0.0.0-20151005221446-73f23eafcf67 ) + +replace github.com/tikv/client-go/v2 => github.com/you06/client-go/v2 v2.0.0-alpha.0.20250107031659-c2c7c5ceda14 diff --git a/go.sum b/go.sum index 2bbcb38c0579c..83e347fdf97df 100644 --- a/go.sum +++ b/go.sum @@ -824,8 +824,6 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= -github.com/tikv/client-go/v2 v2.0.8-0.20241121061241-006dfb024c26 h1:CwiOzQZl7qCJi4QhNbzptX0hJoG10Q/gyLc5QULNW7I= -github.com/tikv/client-go/v2 v2.0.8-0.20241121061241-006dfb024c26/go.mod h1:p9zPFlKBrxhp3b/cBmKBWL9M0X4HtJjgi1ThUtQYF7o= github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31 h1:oAYc4m5Eu1OY9ogJ103VO47AYPHvhtzbUPD8L8B67Qk= github.com/tikv/pd/client v0.0.0-20241111073742-238d4d79ea31/go.mod h1:W5a0sDadwUpI9k8p7M77d3jo253ZHdmua+u4Ho4Xw8U= github.com/timakin/bodyclose v0.0.0-20240125160201-f835fa56326a h1:A6uKudFIfAEpoPdaal3aSqGxBzLyU8TqyXImLwo6dIo= @@ -858,6 +856,8 @@ github.com/xitongsys/parquet-go-source v0.0.0-20190524061010-2b72cbee77d5/go.mod github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0 h1:a742S4V5A15F93smuVxA60LQWsrCnN8bKeWDBARU1/k= github.com/xitongsys/parquet-go-source v0.0.0-20200817004010-026bad9b25d0/go.mod h1:HYhIKsdns7xz80OgkbgJYrtQY7FjHWHKH6cvN7+czGE= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +github.com/you06/client-go/v2 v2.0.0-alpha.0.20250107031659-c2c7c5ceda14 h1:RVb/3HX62v1RQpF+NBt9mtIoORv1bXAplThAg2/F3yo= +github.com/you06/client-go/v2 v2.0.0-alpha.0.20250107031659-c2c7c5ceda14/go.mod h1:p9zPFlKBrxhp3b/cBmKBWL9M0X4HtJjgi1ThUtQYF7o= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/pkg/ddl/column_change_test.go b/pkg/ddl/column_change_test.go index 5c73d0b598239..2ebe1c09d6954 100644 --- a/pkg/ddl/column_change_test.go +++ b/pkg/ddl/column_change_test.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/tidb/pkg/testkit/external" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" "github.com/pingcap/tidb/pkg/types" - "github.com/pingcap/tidb/pkg/util/mock" "github.com/stretchr/testify/require" ) @@ -48,7 +47,7 @@ func TestColumnAdd(t *testing.T) { tk.MustExec("create table t (c1 int, c2 int);") tk.MustExec("insert t values (1, 2);") - ct := testNewContext(store) + ct := testNewContext(t, store) // set up hook var ( deleteOnlyTable table.Table @@ -120,7 +119,7 @@ func TestColumnAdd(t *testing.T) { return } first = false - sess := testNewContext(store) + sess := testNewContext(t, store) txn, err := newTxn(sess) require.NoError(t, err) _, err = writeOnlyTable.AddRecord(sess.GetTableCtx(), txn, types.MakeDatums(10, 10)) @@ -210,6 +209,10 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t if err != nil { return errors.Trace(err) } + err = txn.Commit(context.Background()) + if err != nil { + return errors.Trace(err) + } txn, err = newTxn(ctx) if err != nil { return errors.Trace(err) @@ -248,6 +251,10 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t if err != nil { return errors.Trace(err) } + err = txn.Commit(context.Background()) + if err != nil { + return errors.Trace(err) + } txn, err = newTxn(ctx) if err != nil { return errors.Trace(err) @@ -265,6 +272,10 @@ func checkAddWriteOnly(ctx sessionctx.Context, deleteOnlyTable, writeOnlyTable t if err != nil { return errors.Trace(err) } + err = txn.Commit(context.Background()) + if err != nil { + return errors.Trace(err) + } _, err = newTxn(ctx) if err != nil { return errors.Trace(err) @@ -294,6 +305,10 @@ func checkAddPublic(sctx sessionctx.Context, writeOnlyTable, publicTable table.T if err != nil { return errors.Trace(err) } + err = txn.Commit(context.Background()) + if err != nil { + return errors.Trace(err) + } txn, err = newTxn(sctx) if err != nil { return errors.Trace(err) @@ -311,6 +326,10 @@ func checkAddPublic(sctx sessionctx.Context, writeOnlyTable, publicTable table.T if err != nil { return errors.Trace(err) } + err = txn.Commit(context.Background()) + if err != nil { + return errors.Trace(err) + } _, err = newTxn(sctx) if err != nil { return errors.Trace(err) @@ -415,10 +434,8 @@ func testCheckJobDone(t *testing.T, store kv.Storage, jobID int64, isAdd bool) { } } -func testNewContext(store kv.Storage) sessionctx.Context { - ctx := mock.NewContext() - ctx.Store = store - return ctx +func testNewContext(t *testing.T, store kv.Storage) sessionctx.Context { + return testkit.NewSession(t, store) } func TestIssue40135(t *testing.T) { diff --git a/pkg/ddl/column_test.go b/pkg/ddl/column_test.go index 24508517a60bc..6dd34b0774b85 100644 --- a/pkg/ddl/column_test.go +++ b/pkg/ddl/column_test.go @@ -167,7 +167,7 @@ func TestColumnBasic(t *testing.T) { tk.MustExec(fmt.Sprintf("insert into t1 values(%d, %d, %d)", i, 10*i, 100*i)) } - ctx := testNewContext(store) + ctx := testNewContext(t, store) txn, err := newTxn(ctx) require.NoError(t, err) @@ -214,6 +214,8 @@ func TestColumnBasic(t *testing.T) { h, err := tbl.AddRecord(ctx.GetTableCtx(), txn, types.MakeDatums(11, 12, 13, 14)) require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) _, err = newTxn(ctx) require.NoError(t, err) values, err := tables.RowWithCols(tbl, ctx, h, tbl.Cols()) @@ -379,7 +381,9 @@ func checkDeleteOnlyColumn(t *testing.T, ctx sessionctx.Context, tableID int64, newRow := types.MakeDatums(int64(11), int64(22), int64(33)) newHandle, err := tbl.AddRecord(ctx.GetTableCtx(), txn, newRow) require.NoError(t, err) - _, err = newTxn(ctx) + err = txn.Commit(context.Background()) + require.NoError(t, err) + txn, err = newTxn(ctx) require.NoError(t, err) rows := [][]types.Datum{row, newRow} @@ -401,7 +405,9 @@ func checkDeleteOnlyColumn(t *testing.T, ctx sessionctx.Context, tableID int64, err = tbl.RemoveRecord(ctx.GetTableCtx(), txn, newHandle, newRow) require.NoError(t, err) - _, err = newTxn(ctx) + err = txn.Commit(context.Background()) + require.NoError(t, err) + txn, err = newTxn(ctx) require.NoError(t, err) i = 0 err = tables.IterRecords(tbl, ctx, tbl.Cols(), func(_ kv.Handle, data []types.Datum, cols []*table.Column) (bool, error) { @@ -441,7 +447,9 @@ func checkWriteOnlyColumn(t *testing.T, ctx sessionctx.Context, tableID int64, h newRow := types.MakeDatums(int64(11), int64(22), int64(33)) newHandle, err := tbl.AddRecord(ctx.GetTableCtx(), txn, newRow) require.NoError(t, err) - _, err = newTxn(ctx) + err = txn.Commit(context.Background()) + require.NoError(t, err) + txn, err = newTxn(ctx) require.NoError(t, err) rows := [][]types.Datum{row, newRow} @@ -463,7 +471,10 @@ func checkWriteOnlyColumn(t *testing.T, ctx sessionctx.Context, tableID int64, h err = tbl.RemoveRecord(ctx.GetTableCtx(), txn, newHandle, newRow) require.NoError(t, err) - _, err = newTxn(ctx) + + err = txn.Commit(context.Background()) + require.NoError(t, err) + txn, err = newTxn(ctx) require.NoError(t, err) i = 0 @@ -501,7 +512,9 @@ func checkReorganizationColumn(t *testing.T, ctx sessionctx.Context, tableID int newRow := types.MakeDatums(int64(11), int64(22), int64(33)) newHandle, err := tbl.AddRecord(ctx.GetTableCtx(), txn, newRow) require.NoError(t, err) - _, err = newTxn(ctx) + err = txn.Commit(context.Background()) + require.NoError(t, err) + txn, err = newTxn(ctx) require.NoError(t, err) rows := [][]types.Datum{row, newRow} @@ -524,7 +537,9 @@ func checkReorganizationColumn(t *testing.T, ctx sessionctx.Context, tableID int err = tbl.RemoveRecord(ctx.GetTableCtx(), txn, newHandle, newRow) require.NoError(t, err) - _, err = newTxn(ctx) + err = txn.Commit(context.Background()) + require.NoError(t, err) + txn, err = newTxn(ctx) require.NoError(t, err) i = 0 @@ -567,7 +582,9 @@ func checkPublicColumn(t *testing.T, ctx sessionctx.Context, tableID int64, newC } handle, err := tbl.AddRecord(ctx.GetTableCtx(), txn, newRow) require.NoError(t, err) - _, err = newTxn(ctx) + err = txn.Commit(context.Background()) + require.NoError(t, err) + txn, err = newTxn(ctx) require.NoError(t, err) rows := [][]types.Datum{updatedRow, newRow} @@ -587,8 +604,10 @@ func checkPublicColumn(t *testing.T, ctx sessionctx.Context, tableID int64, newC err = tbl.RemoveRecord(ctx.GetTableCtx(), txn, handle, newRow) require.NoError(t, err) + err = txn.Commit(context.Background()) + require.NoError(t, err) - _, err = newTxn(ctx) + txn, err = newTxn(ctx) require.NoError(t, err) i = 0 @@ -605,7 +624,7 @@ func checkPublicColumn(t *testing.T, ctx sessionctx.Context, tableID int64, newC } func checkAddColumn(t *testing.T, state model.SchemaState, tableID int64, handle kv.Handle, newCol *table.Column, oldRow []types.Datum, columnValue any, dom *domain.Domain, store kv.Storage, columnCnt int) { - ctx := testNewContext(store) + ctx := testNewContext(t, store) switch state { case model.StateNone: checkNoneColumn(t, ctx, tableID, handle, newCol, columnValue, dom) @@ -647,7 +666,7 @@ func TestAddColumn(t *testing.T) { tableID = int64(tableIDi) tbl := testGetTable(t, dom, tableID) - ctx := testNewContext(store) + ctx := testNewContext(t, store) txn, err := newTxn(ctx) require.NoError(t, err) oldRow := types.MakeDatums(int64(1), int64(2), int64(3)) @@ -712,7 +731,7 @@ func TestAddColumns(t *testing.T) { tableID = int64(tableIDi) tbl := testGetTable(t, dom, tableID) - ctx := testNewContext(store) + ctx := testNewContext(t, store) txn, err := newTxn(ctx) require.NoError(t, err) oldRow := types.MakeDatums(int64(1), int64(2), int64(3)) @@ -769,7 +788,7 @@ func TestDropColumnInColumnTest(t *testing.T) { tableID = int64(tableIDi) tbl := testGetTable(t, dom, tableID) - ctx := testNewContext(store) + ctx := testNewContext(t, store) colName := "c4" defaultColValue := int64(4) row := types.MakeDatums(int64(1), int64(2), int64(3)) @@ -823,7 +842,7 @@ func TestDropColumns(t *testing.T) { tableID = int64(tableIDi) tbl := testGetTable(t, dom, tableID) - ctx := testNewContext(store) + ctx := testNewContext(t, store) txn, err := newTxn(ctx) require.NoError(t, err) diff --git a/pkg/ddl/db_integration_test.go b/pkg/ddl/db_integration_test.go index 7721bd6dff3a7..a9f2f26b60184 100644 --- a/pkg/ddl/db_integration_test.go +++ b/pkg/ddl/db_integration_test.go @@ -53,7 +53,6 @@ import ( contextutil "github.com/pingcap/tidb/pkg/util/context" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/dbterror/plannererrors" - "github.com/pingcap/tidb/pkg/util/mock" "github.com/stretchr/testify/require" ) @@ -523,11 +522,10 @@ func TestChangingTableCharset(t *testing.T) { tblInfo.Charset = "" tblInfo.Collate = "" updateTableInfo := func(tblInfo *model.TableInfo) { - mockCtx := mock.NewContext() - mockCtx.Store = store - err := sessiontxn.NewTxn(context.Background(), mockCtx) + ctx := testkit.NewSession(t, store) + err := sessiontxn.NewTxn(context.Background(), ctx) require.NoError(t, err) - txn, err := mockCtx.Txn(true) + txn, err := ctx.Txn(true) require.NoError(t, err) mt := meta.NewMutator(txn) @@ -769,11 +767,10 @@ func TestCaseInsensitiveCharsetAndCollate(t *testing.T) { tblInfo.Charset = "UTF8MB4" updateTableInfo := func(tblInfo *model.TableInfo) { - mockCtx := mock.NewContext() - mockCtx.Store = store - err := sessiontxn.NewTxn(context.Background(), mockCtx) + sctx := testkit.NewSession(t, store) + err := sessiontxn.NewTxn(context.Background(), sctx) require.NoError(t, err) - txn, err := mockCtx.Txn(true) + txn, err := sctx.Txn(true) require.NoError(t, err) mt := meta.NewMutator(txn) require.True(t, ok) @@ -1437,11 +1434,10 @@ func TestTreatOldVersionUTF8AsUTF8MB4(t *testing.T) { tblInfo.Version = model.TableInfoVersion0 tblInfo.Columns[0].Version = model.ColumnInfoVersion0 updateTableInfo := func(tblInfo *model.TableInfo) { - mockCtx := mock.NewContext() - mockCtx.Store = store - err := sessiontxn.NewTxn(context.Background(), mockCtx) + sctx := testkit.NewSession(t, store) + err := sessiontxn.NewTxn(context.Background(), sctx) require.NoError(t, err) - txn, err := mockCtx.Txn(true) + txn, err := sctx.Txn(true) require.NoError(t, err) mt := meta.NewMutator(txn) require.True(t, ok) diff --git a/pkg/ddl/index_change_test.go b/pkg/ddl/index_change_test.go index c65f3e5105cbc..62574e9ca9f37 100644 --- a/pkg/ddl/index_change_test.go +++ b/pkg/ddl/index_change_test.go @@ -59,7 +59,7 @@ func TestIndexChange(t *testing.T) { return } jobID.Store(job.ID) - ctx1 := testNewContext(store) + ctx1 := testNewContext(t, store) prevState = job.SchemaState require.NoError(t, dom.Reload()) tbl, exist := dom.InfoSchema().TableByID(context.Background(), job.TableID) @@ -106,7 +106,7 @@ func TestIndexChange(t *testing.T) { require.NoError(t, dom.Reload()) tbl, exist := dom.InfoSchema().TableByID(context.Background(), job.TableID) require.True(t, exist) - ctx1 := testNewContext(store) + ctx1 := testNewContext(t, store) switch job.SchemaState { case model.StateWriteOnly: writeOnlyTable = tbl diff --git a/pkg/ddl/job_worker_test.go b/pkg/ddl/job_worker_test.go index d99e3f44b3fa0..a705b1442e927 100644 --- a/pkg/ddl/job_worker_test.go +++ b/pkg/ddl/job_worker_test.go @@ -53,7 +53,7 @@ func TestInvalidDDLJob(t *testing.T) { BinlogInfo: &model.HistoryInfo{}, InvolvingSchemaInfo: []model.InvolvingSchemaInfo{{}}, } - ctx := testNewContext(store) + ctx := testNewContext(t, store) ctx.SetValue(sessionctx.QueryString, "skip") de := dom.DDLExecutor().(ddl.ExecutorForTest) err := de.DoDDLJobWrapper(ctx, ddl.NewJobWrapperWithArgs(job, &model.EmptyArgs{}, true)) @@ -62,7 +62,7 @@ func TestInvalidDDLJob(t *testing.T) { func TestAddBatchJobError(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomainWithSchemaLease(t, testLease) - ctx := testNewContext(store) + ctx := testNewContext(t, store) require.Nil(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockAddBatchDDLJobsErr", `return(true)`)) // Test the job runner should not hang forever. diff --git a/pkg/executor/set.go b/pkg/executor/set.go index d39ce5bffe43c..7afec29349262 100644 --- a/pkg/executor/set.go +++ b/pkg/executor/set.go @@ -222,7 +222,8 @@ func (e *SetExecutor) setSysVariable(ctx context.Context, name string, v *expres newSnapshotTS := getSnapshotTSByName() newSnapshotIsSet := newSnapshotTS > 0 && newSnapshotTS != oldSnapshotTS if newSnapshotIsSet { - err = sessionctx.ValidateSnapshotReadTS(ctx, e.Ctx().GetStore(), newSnapshotTS) + isStaleRead := name == variable.TiDBTxnReadTS + err = sessionctx.ValidateSnapshotReadTS(ctx, e.Ctx().GetStore(), newSnapshotTS, isStaleRead) if name != variable.TiDBTxnReadTS { // Also check gc safe point for snapshot read. // We don't check snapshot with gc safe point for read_ts diff --git a/pkg/executor/test/executor/executor_test.go b/pkg/executor/test/executor/executor_test.go index e0901c3ffaef9..2982187a4c4bb 100644 --- a/pkg/executor/test/executor/executor_test.go +++ b/pkg/executor/test/executor/executor_test.go @@ -288,8 +288,7 @@ func TestNotFillCacheFlag(t *testing.T) { func TestCheckIndex(t *testing.T) { store, dom := testkit.CreateMockStoreAndDomain(t) - ctx := mock.NewContext() - ctx.Store = store + ctx := testkit.NewSession(t, store) se, err := session.CreateSession4Test(store) require.NoError(t, err) defer se.Close() diff --git a/pkg/executor/test/writetest/BUILD.bazel b/pkg/executor/test/writetest/BUILD.bazel index 9cc1d31104829..9274fe7730b26 100644 --- a/pkg/executor/test/writetest/BUILD.bazel +++ b/pkg/executor/test/writetest/BUILD.bazel @@ -26,7 +26,6 @@ go_test( "//pkg/testkit", "//pkg/types", "//pkg/util", - "//pkg/util/mock", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//tikv", "@io_opencensus_go//stats/view", diff --git a/pkg/executor/test/writetest/write_test.go b/pkg/executor/test/writetest/write_test.go index 98def043bcea3..b1c420e3352bc 100644 --- a/pkg/executor/test/writetest/write_test.go +++ b/pkg/executor/test/writetest/write_test.go @@ -35,7 +35,6 @@ import ( "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util" - "github.com/pingcap/tidb/pkg/util/mock" "github.com/stretchr/testify/require" ) @@ -335,8 +334,7 @@ func TestReplaceLog(t *testing.T) { tk.MustExec(`create table testLog (a int not null primary key, b int unique key);`) // Make some dangling index. - ctx := mock.NewContext() - ctx.Store = store + ctx := testkit.NewSession(t, store) is := domain.InfoSchema() dbName := model.NewCIStr("test") tblName := model.NewCIStr("testLog") @@ -369,8 +367,7 @@ func TestRebaseIfNeeded(t *testing.T) { tk.MustExec(`create table t (a int not null primary key auto_increment, b int unique key);`) tk.MustExec(`insert into t (b) values (1);`) - ctx := mock.NewContext() - ctx.Store = store + ctx := testkit.NewSession(t, store) tbl, err := domain.InfoSchema().TableByName(context.Background(), model.NewCIStr("test"), model.NewCIStr("t")) require.NoError(t, err) require.Nil(t, sessiontxn.NewTxn(context.Background(), ctx)) diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 2fb0daf4b6b08..6bec49fa60209 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -3656,7 +3656,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (base. if err != nil { return nil, err } - if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS, true); err != nil { return nil, err } p.StaleTxnStartTS = startTS @@ -3670,7 +3670,7 @@ func (b *PlanBuilder) buildSimple(ctx context.Context, node ast.StmtNode) (base. if err != nil { return nil, err } - if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS); err != nil { + if err := sessionctx.ValidateSnapshotReadTS(ctx, b.ctx.GetStore(), startTS, true); err != nil { return nil, err } p.StaleTxnStartTS = startTS diff --git a/pkg/sessionctx/context.go b/pkg/sessionctx/context.go index b3cc26cb95376..7f27891fa599e 100644 --- a/pkg/sessionctx/context.go +++ b/pkg/sessionctx/context.go @@ -240,7 +240,10 @@ const ( LastExecuteDDL basicCtxType = 3 ) -// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp -func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64) error { - return store.GetOracle().ValidateSnapshotReadTS(ctx, readTS, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) +// ValidateSnapshotReadTS strictly validates that readTS does not exceed the PD timestamp. +// For read requests to the storage, the check can be implicitly performed when sending the RPC request. So this +// function is only needed when it's not proper to delay the check to when RPC requests are being sent (e.g., `BEGIN` +// statements that don't make reading operation immediately). +func ValidateSnapshotReadTS(ctx context.Context, store kv.Storage, readTS uint64, isStaleRead bool) error { + return store.GetOracle().ValidateReadTS(ctx, readTS, isStaleRead, &oracle.Option{TxnScope: oracle.GlobalTxnScope}) } diff --git a/pkg/sessiontxn/staleread/processor.go b/pkg/sessiontxn/staleread/processor.go index e1e87b958c345..27c10f3d18c50 100644 --- a/pkg/sessiontxn/staleread/processor.go +++ b/pkg/sessiontxn/staleread/processor.go @@ -286,7 +286,7 @@ func parseAndValidateAsOf(ctx context.Context, sctx sessionctx.Context, asOf *as return 0, err } - if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts); err != nil { + if err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), ts, true); err != nil { return 0, err } diff --git a/pkg/sessiontxn/staleread/util.go b/pkg/sessiontxn/staleread/util.go index 57320ddbbf274..c44df2322dbd6 100644 --- a/pkg/sessiontxn/staleread/util.go +++ b/pkg/sessiontxn/staleread/util.go @@ -81,7 +81,7 @@ func CalculateTsWithReadStaleness(ctx context.Context, sctx sessionctx.Context, // If the final calculated exceeds the min safe ts, we are not sure whether the ts is safe to read (note that // reading with a ts larger than PD's max allocated ts + 1 is unsafe and may break linearizability). // So in this case, do an extra check on it. - err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS) + err = sessionctx.ValidateSnapshotReadTS(ctx, sctx.GetStore(), readTS, true) if err != nil { return 0, err } diff --git a/pkg/store/copr/BUILD.bazel b/pkg/store/copr/BUILD.bazel index 54aee8bf1a81d..df5dd3cb2636d 100644 --- a/pkg/store/copr/BUILD.bazel +++ b/pkg/store/copr/BUILD.bazel @@ -55,6 +55,7 @@ go_library( "@com_github_tikv_client_go_v2//config", "@com_github_tikv_client_go_v2//error", "@com_github_tikv_client_go_v2//metrics", + "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//tikvrpc/interceptor", diff --git a/pkg/store/copr/batch_coprocessor.go b/pkg/store/copr/batch_coprocessor.go index 5a1847e99ae86..c526e57146b84 100644 --- a/pkg/store/copr/batch_coprocessor.go +++ b/pkg/store/copr/batch_coprocessor.go @@ -1320,7 +1320,7 @@ func (b *batchCopIterator) retryBatchCopTask(ctx context.Context, bo *backoff.Ba const TiFlashReadTimeoutUltraLong = 3600 * time.Second func (b *batchCopIterator) handleTaskOnce(ctx context.Context, bo *backoff.Backoffer, task *batchCopTask) ([]*batchCopTask, error) { - sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.enableCollectExecutionInfo) + sender := NewRegionBatchRequestSender(b.store.GetRegionCache(), b.store.GetTiKVClient(), b.store.store.GetOracle(), b.enableCollectExecutionInfo) var regionInfos = make([]*coprocessor.RegionInfo, 0, len(task.regionInfos)) for _, ri := range task.regionInfos { regionInfos = append(regionInfos, ri.toCoprocessorRegionInfo()) diff --git a/pkg/store/copr/batch_request_sender.go b/pkg/store/copr/batch_request_sender.go index ccb138f7753c3..5c6d9a6cbe192 100644 --- a/pkg/store/copr/batch_request_sender.go +++ b/pkg/store/copr/batch_request_sender.go @@ -23,6 +23,7 @@ import ( "github.com/pingcap/kvproto/pkg/metapb" "github.com/pingcap/tidb/pkg/config" tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "google.golang.org/grpc/codes" @@ -56,9 +57,9 @@ type RegionBatchRequestSender struct { } // NewRegionBatchRequestSender creates a RegionBatchRequestSender object. -func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client, enableCollectExecutionInfo bool) *RegionBatchRequestSender { +func NewRegionBatchRequestSender(cache *RegionCache, client tikv.Client, oracle oracle.Oracle, enableCollectExecutionInfo bool) *RegionBatchRequestSender { return &RegionBatchRequestSender{ - RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client), + RegionRequestSender: tikv.NewRegionRequestSender(cache.RegionCache, client, oracle), enableCollectExecutionInfo: enableCollectExecutionInfo, } } diff --git a/pkg/store/copr/mpp.go b/pkg/store/copr/mpp.go index 32c098aa35cf2..d0fdcaa0bd255 100644 --- a/pkg/store/copr/mpp.go +++ b/pkg/store/copr/mpp.go @@ -139,7 +139,7 @@ func (c *MPPClient) DispatchMPPTask(param kv.DispatchMPPTaskParam) (resp *mpp.Di // Or else it's the task without region, which always happens in high layer task without table. // In that case if originalTask != nil { - sender := NewRegionBatchRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient(), param.EnableCollectExecutionInfo) + sender := NewRegionBatchRequestSender(c.store.GetRegionCache(), c.store.GetTiKVClient(), c.store.store.GetOracle(), param.EnableCollectExecutionInfo) rpcResp, retry, _, err = sender.SendReqToAddr(bo, originalTask.ctx, originalTask.regionInfos, wrappedReq, tikv.ReadTimeoutMedium) // No matter what the rpc error is, we won't retry the mpp dispatch tasks. // TODO: If we want to retry, we must redo the plan fragment cutting and task scheduling. diff --git a/pkg/util/mock/BUILD.bazel b/pkg/util/mock/BUILD.bazel index dba359b92def6..28c64a80ad7df 100644 --- a/pkg/util/mock/BUILD.bazel +++ b/pkg/util/mock/BUILD.bazel @@ -35,6 +35,7 @@ go_library( "//pkg/util", "//pkg/util/chunk", "//pkg/util/disk", + "//pkg/util/logutil", "//pkg/util/memory", "//pkg/util/ranger/context", "//pkg/util/sli", diff --git a/pkg/util/mock/context.go b/pkg/util/mock/context.go index afc006423f290..ba79e993754d7 100644 --- a/pkg/util/mock/context.go +++ b/pkg/util/mock/context.go @@ -46,6 +46,7 @@ import ( "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/chunk" "github.com/pingcap/tidb/pkg/util/disk" + "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/memory" rangerctx "github.com/pingcap/tidb/pkg/util/ranger/context" "github.com/pingcap/tidb/pkg/util/sli" @@ -86,7 +87,7 @@ type wrapTxn struct { } func (txn *wrapTxn) validOrPending() bool { - return txn.tsFuture != nil || txn.Transaction.Valid() + return txn.tsFuture != nil || (txn.Transaction != nil && txn.Transaction.Valid()) } func (txn *wrapTxn) pending() bool { @@ -130,23 +131,6 @@ func (txn *wrapTxn) GetTableInfo(id int64) *model.TableInfo { return txn.Transaction.GetTableInfo(id) } -// SetDiskFullOpt implements the interface. -func (*wrapTxn) SetDiskFullOpt(_ kvrpcpb.DiskFullOpt) {} - -// SetOption implements the interface. -func (*wrapTxn) SetOption(_ int, _ any) {} - -// StartTS implements the interface. -func (*wrapTxn) StartTS() uint64 { return uint64(time.Now().UnixNano()) } - -// Get implements the interface. -func (txn *wrapTxn) Get(ctx context.Context, k kv.Key) ([]byte, error) { - if txn.Transaction == nil { - return nil, nil - } - return txn.Transaction.Get(ctx, k) -} - // Execute implements sqlexec.SQLExecutor Execute interface. func (*Context) Execute(_ context.Context, _ string) ([]sqlexec.RecordSet, error) { return nil, errors.Errorf("Not Supported") @@ -317,7 +301,15 @@ func (c *Context) GetBuildPBCtx() *planctx.BuildPBContext { } // Txn implements sessionctx.Context Txn interface. -func (c *Context) Txn(bool) (kv.Transaction, error) { +func (c *Context) Txn(active bool) (kv.Transaction, error) { + if active { + if !c.txn.validOrPending() { + err := c.newTxn(context.Background()) + if err != nil { + return nil, err + } + } + } return &c.txn, nil } @@ -394,10 +386,12 @@ func (c *Context) GetSessionPlanCache() sessionctx.SessionPlanCache { return c.pcache } -// NewTxn implements the sessionctx.Context interface. -func (c *Context) NewTxn(context.Context) error { +// newTxn Creates new transaction on the session context. +func (c *Context) newTxn(ctx context.Context) error { if c.Store == nil { - return errors.New("store is not set") + logutil.Logger(ctx).Warn("mock.Context: No store is specified when trying to create new transaction. A fake transaction will be created. Note that this is unrecommended usage.") + c.fakeTxn() + return nil } if c.txn.Valid() { err := c.txn.Commit(c.ctx) @@ -414,14 +408,41 @@ func (c *Context) NewTxn(context.Context) error { return nil } -// NewStaleTxnWithStartTS implements the sessionctx.Context interface. -func (c *Context) NewStaleTxnWithStartTS(ctx context.Context, _ uint64) error { - return c.NewTxn(ctx) +// fakeTxn is used to let some tests pass in the context without an available kv.Storage. Once usages to access +// transactions without a kv.Storage are removed, this type should also be removed. +// New code should never use this. +type fakeTxn struct { + // The inner should always be nil. + kv.Transaction + startTS uint64 +} + +func (t *fakeTxn) StartTS() uint64 { + return t.startTS +} + +func (*fakeTxn) SetDiskFullOpt(_ kvrpcpb.DiskFullOpt) {} + +func (*fakeTxn) SetOption(_ int, _ any) {} + +func (*fakeTxn) Get(ctx context.Context, _ kv.Key) ([]byte, error) { + // Check your implementation if you meet this error. It's dangerous if some calculation relies on the data but the + // read result is faked. + logutil.Logger(ctx).Warn("mock.Context: No store is specified but trying to access data from a transaction.") + return nil, nil +} + +func (*fakeTxn) Valid() bool { return true } + +func (c *Context) fakeTxn() { + c.txn.Transaction = &fakeTxn{ + startTS: 1, + } } // RefreshTxnCtx implements the sessionctx.Context interface. func (c *Context) RefreshTxnCtx(ctx context.Context) error { - return errors.Trace(c.NewTxn(ctx)) + return errors.Trace(c.newTxn(ctx)) } // RollbackTxn indicates an expected call of RollbackTxn. diff --git a/pkg/util/mock/fortest.go b/pkg/util/mock/fortest.go index ac95f1555b5f4..87a301e2a6ad9 100644 --- a/pkg/util/mock/fortest.go +++ b/pkg/util/mock/fortest.go @@ -21,6 +21,8 @@ package mock // NewContext creates a new mocked sessionctx.Context. // This function should only be used for testing. +// Avoid using this when you are in a context with a `kv.Storage` instance, especially when you are going to access +// the data in it. Consider using testkit.NewSession(t, store) instead when possible. func NewContext() *Context { return newContext() }