diff --git a/error/error.go b/error/error.go index 14f29a440c..adc9776c4d 100644 --- a/error/error.go +++ b/error/error.go @@ -141,6 +141,11 @@ func (d *PDError) Error() string { // ErrKeyExist wraps *pdpb.AlreadyExist to implement the error interface. type ErrKeyExist struct { *kvrpcpb.AlreadyExist + // Value stores the value that is being written when the conflict is detected. + // It was supposed to be an Insert mutation so the value should exist. + // The value is not in the kv protocol, + // it is introduced for error message handling in pipeliend-DML. + Value []byte } func (k *ErrKeyExist) Error() string { diff --git a/go.mod b/go.mod index 933b3b7ae8..cf05ac3e4a 100644 --- a/go.mod +++ b/go.mod @@ -52,7 +52,7 @@ require ( golang.org/x/net v0.22.0 // indirect golang.org/x/sys v0.18.0 // indirect golang.org/x/text v0.14.0 // indirect - golang.org/x/tools v0.19.0 // indirect + golang.org/x/tools v0.18.0 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8 // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20240308144416-29370a3891b7 // indirect google.golang.org/protobuf v1.33.0 // indirect diff --git a/go.sum b/go.sum index 30a0e979cc..dee7498f9c 100644 --- a/go.sum +++ b/go.sum @@ -192,8 +192,8 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= -golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw= -golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc= +golang.org/x/tools v0.18.0 h1:k8NLag8AGHnn+PHbl7g43CtqZAwG60vZkLqgyZgIHgQ= +golang.org/x/tools v0.18.0/go.mod h1:GL7B4CwcLLeo59yx/9UWWuNOW1n3VZ4f5axWfML7Lcg= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= diff --git a/internal/unionstore/pipelined_memdb.go b/internal/unionstore/pipelined_memdb.go index 0ac39a8bc9..cab6d30a33 100644 --- a/internal/unionstore/pipelined_memdb.go +++ b/internal/unionstore/pipelined_memdb.go @@ -16,15 +16,18 @@ package unionstore import ( "context" + stderrors "errors" "sync" "sync/atomic" "time" "github.com/pingcap/errors" tikverr "github.com/tikv/client-go/v2/error" + "github.com/tikv/client-go/v2/internal/logutil" "github.com/tikv/client-go/v2/kv" "github.com/tikv/client-go/v2/metrics" "github.com/tikv/client-go/v2/util" + "go.uber.org/zap" ) // PipelinedMemDB is a Store which contains @@ -216,6 +219,8 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) { } if p.flushingMemDB != nil { if err := <-p.errCh; err != nil { + err = p.handleAlreadyExistErr(err) + p.flushingMemDB = nil return false, err } } @@ -261,6 +266,7 @@ func (p *PipelinedMemDB) needFlush() bool { func (p *PipelinedMemDB) FlushWait() error { if p.flushingMemDB != nil { err := <-p.errCh + err = p.handleAlreadyExistErr(err) // cleanup the flushingMemDB so the next call of FlushWait will not wait for the error channel. p.flushingMemDB = nil return err @@ -268,6 +274,25 @@ func (p *PipelinedMemDB) FlushWait() error { return nil } +func (p *PipelinedMemDB) handleAlreadyExistErr(err error) error { + var existErr *tikverr.ErrKeyExist + if stderrors.As(err, &existErr) { + v, err2 := p.flushingMemDB.Get(existErr.GetKey()) + if err2 != nil { + // TODO: log more info like start_ts, also for other logs + logutil.BgLogger().Warn( + "[pipelined-dml] Getting value from flushingMemDB when"+ + " AlreadyExist error occurs failed", zap.Error(err2), + zap.Uint64("generation", p.generation), + ) + } else { + existErr.Value = v + } + return existErr + } + return err +} + // Iter implements the Retriever interface. func (p *PipelinedMemDB) Iter([]byte, []byte) (Iterator, error) { return nil, errors.New("pipelined memdb does not support Iter")