Skip to content

Commit

Permalink
feat: for AlreadyExist error during Flush, attach the value from flus…
Browse files Browse the repository at this point in the history
…hingMemDB to it

Signed-off-by: ekexium <eke@fastmail.com>
  • Loading branch information
ekexium committed Mar 12, 2024
1 parent 3e419e6 commit eeb30c3
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 3 deletions.
5 changes: 5 additions & 0 deletions error/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,11 @@ func (d *PDError) Error() string {
// ErrKeyExist wraps *pdpb.AlreadyExist to implement the error interface.
type ErrKeyExist struct {
*kvrpcpb.AlreadyExist
// Value stores the value that is being written when the conflict is detected.
// It was supposed to be an Insert mutation so the value should exist.
// The value is not in the kv protocol,
// it is introduced for error message handling in pipeliend-DML.
Value []byte
}

func (k *ErrKeyExist) Error() string {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ require (
golang.org/x/net v0.22.0 // indirect
golang.org/x/sys v0.18.0 // indirect
golang.org/x/text v0.14.0 // indirect
golang.org/x/tools v0.19.0 // indirect
golang.org/x/tools v0.18.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240304212257-790db918fca8 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240308144416-29370a3891b7 // indirect
google.golang.org/protobuf v1.33.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ golang.org/x/tools v0.0.0-20191108193012-7d206e10da11/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.19.0 h1:tfGCXNR1OsFG+sVdLAitlpjAvD/I6dHDKnYrpEZUHkw=
golang.org/x/tools v0.19.0/go.mod h1:qoJWxmGSIBmAeriMx19ogtrEPrGtDbPK634QFIcLAhc=
golang.org/x/tools v0.18.0 h1:k8NLag8AGHnn+PHbl7g43CtqZAwG60vZkLqgyZgIHgQ=
golang.org/x/tools v0.18.0/go.mod h1:GL7B4CwcLLeo59yx/9UWWuNOW1n3VZ4f5axWfML7Lcg=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
25 changes: 25 additions & 0 deletions internal/unionstore/pipelined_memdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,18 @@ package unionstore

import (
"context"
stderrors "errors"
"sync"
"sync/atomic"
"time"

"github.com/pingcap/errors"
tikverr "github.com/tikv/client-go/v2/error"
"github.com/tikv/client-go/v2/internal/logutil"
"github.com/tikv/client-go/v2/kv"
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/util"
"go.uber.org/zap"
)

// PipelinedMemDB is a Store which contains
Expand Down Expand Up @@ -216,6 +219,8 @@ func (p *PipelinedMemDB) Flush(force bool) (bool, error) {
}
if p.flushingMemDB != nil {
if err := <-p.errCh; err != nil {
err = p.handleAlreadyExistErr(err)
p.flushingMemDB = nil
return false, err
}
}
Expand Down Expand Up @@ -261,13 +266,33 @@ func (p *PipelinedMemDB) needFlush() bool {
func (p *PipelinedMemDB) FlushWait() error {
if p.flushingMemDB != nil {
err := <-p.errCh
err = p.handleAlreadyExistErr(err)
// cleanup the flushingMemDB so the next call of FlushWait will not wait for the error channel.
p.flushingMemDB = nil
return err
}
return nil
}

func (p *PipelinedMemDB) handleAlreadyExistErr(err error) error {
var existErr *tikverr.ErrKeyExist
if stderrors.As(err, &existErr) {
v, err2 := p.flushingMemDB.Get(existErr.GetKey())
if err2 != nil {
// TODO: log more info like start_ts, also for other logs
logutil.BgLogger().Warn(
"[pipelined-dml] Getting value from flushingMemDB when"+
" AlreadyExist error occurs failed", zap.Error(err2),
zap.Uint64("generation", p.generation),
)
} else {
existErr.Value = v
}
return existErr
}
return err
}

// Iter implements the Retriever interface.
func (p *PipelinedMemDB) Iter([]byte, []byte) (Iterator, error) {
return nil, errors.New("pipelined memdb does not support Iter")
Expand Down

0 comments on commit eeb30c3

Please sign in to comment.