@@ -39,6 +39,7 @@ import (
39
39
"github.com/pingcap/tidb/pkg/disttask/framework/proto"
40
40
"github.com/pingcap/tidb/pkg/disttask/framework/scheduler"
41
41
"github.com/pingcap/tidb/pkg/disttask/framework/storage"
42
+ "github.com/pingcap/tidb/pkg/errctx"
42
43
"github.com/pingcap/tidb/pkg/infoschema"
43
44
"github.com/pingcap/tidb/pkg/kv"
44
45
"github.com/pingcap/tidb/pkg/lightning/common"
@@ -50,7 +51,6 @@ import (
50
51
"github.com/pingcap/tidb/pkg/parser/mysql"
51
52
"github.com/pingcap/tidb/pkg/parser/terror"
52
53
"github.com/pingcap/tidb/pkg/sessionctx"
53
- "github.com/pingcap/tidb/pkg/sessionctx/stmtctx"
54
54
"github.com/pingcap/tidb/pkg/sessionctx/variable"
55
55
"github.com/pingcap/tidb/pkg/store/helper"
56
56
"github.com/pingcap/tidb/pkg/table"
@@ -61,6 +61,7 @@ import (
61
61
"github.com/pingcap/tidb/pkg/util/backoff"
62
62
"github.com/pingcap/tidb/pkg/util/chunk"
63
63
"github.com/pingcap/tidb/pkg/util/codec"
64
+ contextutil "github.com/pingcap/tidb/pkg/util/context"
64
65
"github.com/pingcap/tidb/pkg/util/dbterror"
65
66
tidblogutil "github.com/pingcap/tidb/pkg/util/logutil"
66
67
decoder "github.com/pingcap/tidb/pkg/util/rowDecoder"
@@ -1651,7 +1652,9 @@ type addIndexIngestWorker struct {
1651
1652
ctx context.Context
1652
1653
d * ddlCtx
1653
1654
metricCounter prometheus.Counter
1654
- sessCtx sessionctx.Context
1655
+ writeLoc * time.Location
1656
+ writeErrCtx errctx.Context
1657
+ writeStmtBufs variable.WriteStmtBufs
1655
1658
1656
1659
tbl table.PhysicalTable
1657
1660
indexes []table.Index
@@ -1666,17 +1669,20 @@ type addIndexIngestWorker struct {
1666
1669
func newAddIndexIngestWorker (
1667
1670
ctx context.Context ,
1668
1671
t table.PhysicalTable ,
1669
- d * ddlCtx ,
1672
+ info * reorgInfo ,
1670
1673
engines []ingest.Engine ,
1671
1674
resultCh chan * backfillResult ,
1672
1675
jobID int64 ,
1673
- schemaName string ,
1674
1676
indexIDs []int64 ,
1675
1677
writerID int ,
1676
1678
copReqSenderPool * copReqSenderPool ,
1677
- sessCtx sessionctx.Context ,
1678
1679
checkpointMgr * ingest.CheckpointManager ,
1679
1680
) (* addIndexIngestWorker , error ) {
1681
+ writeLoc , err := reorgTimeZoneWithTzLoc (info .ReorgMeta .Location )
1682
+ if err != nil {
1683
+ return nil , err
1684
+ }
1685
+
1680
1686
indexes := make ([]table.Index , 0 , len (indexIDs ))
1681
1687
writers := make ([]ingest.Writer , 0 , len (indexIDs ))
1682
1688
for i , indexID := range indexIDs {
@@ -1690,12 +1696,18 @@ func newAddIndexIngestWorker(
1690
1696
writers = append (writers , lw )
1691
1697
}
1692
1698
1699
+ writeErrCtx := errctx .NewContextWithLevels (
1700
+ reorgErrLevelsWithSQLMode (info .ReorgMeta .SQLMode ),
1701
+ contextutil .IgnoreWarn ,
1702
+ )
1703
+
1693
1704
return & addIndexIngestWorker {
1694
- ctx : ctx ,
1695
- d : d ,
1696
- sessCtx : sessCtx ,
1705
+ ctx : ctx ,
1706
+ d : info .d ,
1707
+ writeLoc : writeLoc ,
1708
+ writeErrCtx : writeErrCtx ,
1697
1709
metricCounter : metrics .BackfillTotalCounter .WithLabelValues (
1698
- metrics .GenerateReorgLabel ("add_idx_rate" , schemaName , t .Meta ().Name .O )),
1710
+ metrics .GenerateReorgLabel ("add_idx_rate" , info . SchemaName , t .Meta ().Name .O )),
1699
1711
tbl : t ,
1700
1712
indexes : indexes ,
1701
1713
writers : writers ,
@@ -1710,9 +1722,8 @@ func newAddIndexIngestWorker(
1710
1722
func (w * addIndexIngestWorker ) WriteLocal (rs * IndexRecordChunk ) (count int , nextKey kv.Key , err error ) {
1711
1723
oprStartTime := time .Now ()
1712
1724
copCtx := w .copReqSenderPool .copCtx
1713
- vars := w .sessCtx .GetSessionVars ()
1714
1725
cnt , lastHandle , err := writeChunkToLocal (
1715
- w .ctx , w .writers , w .indexes , copCtx , vars , rs .Chunk )
1726
+ w .ctx , w .writers , w .indexes , copCtx , w . writeLoc , w . writeErrCtx , & w . writeStmtBufs , rs .Chunk )
1716
1727
if err != nil || cnt == 0 {
1717
1728
return 0 , nil , err
1718
1729
}
@@ -1727,10 +1738,11 @@ func writeChunkToLocal(
1727
1738
writers []ingest.Writer ,
1728
1739
indexes []table.Index ,
1729
1740
copCtx copr.CopContext ,
1730
- vars * variable.SessionVars ,
1741
+ loc * time.Location ,
1742
+ errCtx errctx.Context ,
1743
+ writeStmtBufs * variable.WriteStmtBufs ,
1731
1744
copChunk * chunk.Chunk ,
1732
1745
) (int , kv.Handle , error ) {
1733
- sCtx , writeBufs := vars .StmtCtx , vars .GetWriteStmtBufs ()
1734
1746
iter := chunk .NewIterator4Chunk (copChunk )
1735
1747
c := copCtx .GetBase ()
1736
1748
@@ -1772,7 +1784,7 @@ func writeChunkToLocal(
1772
1784
restoreDataBuf [i ] = * datum .Clone ()
1773
1785
}
1774
1786
}
1775
- h , err := buildHandle (handleDataBuf , c .TableInfo , c .PrimaryKeyInfo , sCtx )
1787
+ h , err := buildHandle (handleDataBuf , c .TableInfo , c .PrimaryKeyInfo , loc , errCtx )
1776
1788
if err != nil {
1777
1789
return 0 , nil , errors .Trace (err )
1778
1790
}
@@ -1785,7 +1797,7 @@ func writeChunkToLocal(
1785
1797
if needRestoreForIndexes [i ] {
1786
1798
rsData = getRestoreData (c .TableInfo , copCtx .IndexInfo (idxID ), c .PrimaryKeyInfo , restoreDataBuf )
1787
1799
}
1788
- err = writeOneKVToLocal (ctx , writers [i ], index , sCtx , writeBufs , idxData , rsData , h )
1800
+ err = writeOneKVToLocal (ctx , writers [i ], index , loc , errCtx , writeStmtBufs , idxData , rsData , h )
1789
1801
if err != nil {
1790
1802
return 0 , nil , errors .Trace (err )
1791
1803
}
@@ -1811,12 +1823,13 @@ func writeOneKVToLocal(
1811
1823
ctx context.Context ,
1812
1824
writer ingest.Writer ,
1813
1825
index table.Index ,
1814
- sCtx * stmtctx.StatementContext ,
1826
+ loc * time.Location ,
1827
+ errCtx errctx.Context ,
1815
1828
writeBufs * variable.WriteStmtBufs ,
1816
1829
idxDt , rsData []types.Datum ,
1817
1830
handle kv.Handle ,
1818
1831
) error {
1819
- iter := index .GenIndexKVIter (sCtx . ErrCtx (), sCtx . TimeZone () , idxDt , handle , rsData )
1832
+ iter := index .GenIndexKVIter (errCtx , loc , idxDt , handle , rsData )
1820
1833
for iter .Valid () {
1821
1834
key , idxVal , _ , err := iter .Next (writeBufs .IndexKeyBuf , writeBufs .RowValBuf )
1822
1835
if err != nil {
0 commit comments