Skip to content

Commit

Permalink
fix: check kill signal against 0, so that all kill signals will not b…
Browse files Browse the repository at this point in the history
…e ignored.

Signed-off-by: ekexium <eke@fastmail.com>
  • Loading branch information
ekexium committed Jan 3, 2024
1 parent b50b151 commit a17c279
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 8 deletions.
15 changes: 11 additions & 4 deletions pkg/executor/internal/mpp/local_mpp_coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,10 +668,17 @@ func (c *localMppCoordinator) nextImpl(ctx context.Context) (resp *mppResponse,
case resp, ok = <-c.respChan:
return
case <-ticker.C:
if c.vars != nil && c.vars.Killed != nil && atomic.LoadUint32(c.vars.Killed) == 1 {
err = derr.ErrQueryInterrupted
exit = true
return
if c.vars != nil && c.vars.Killed != nil {
killed := atomic.LoadUint32(c.vars.Killed)
if killed != 0 {
logutil.Logger(ctx).Info(
"a killed signal is received",
zap.Uint32("signal", killed),
)
err = derr.ErrQueryInterrupted
exit = true
return
}
}
case <-c.finishCh:
exit = true
Expand Down
7 changes: 6 additions & 1 deletion pkg/store/copr/batch_coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1167,7 +1167,12 @@ func (b *batchCopIterator) recvFromRespCh(ctx context.Context) (resp *batchCopRe
case resp, ok = <-b.respChan:
return
case <-ticker.C:
if atomic.LoadUint32(b.vars.Killed) == 1 {
killed := atomic.LoadUint32(b.vars.Killed)
if killed != 0 {
logutil.Logger(ctx).Info(
"a killed signal is received",
zap.Uint32("signal", killed),
)
resp = &batchCopResponse{err: derr.ErrQueryInterrupted}
ok = true
return
Expand Down
18 changes: 15 additions & 3 deletions pkg/store/copr/coprocessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -937,7 +937,12 @@ func (it *copIterator) recvFromRespCh(ctx context.Context, respCh <-chan *copRes
exit = true
return
case <-ticker.C:
if atomic.LoadUint32(it.vars.Killed) == 1 {
killed := atomic.LoadUint32(it.vars.Killed)
if killed != 0 {
logutil.Logger(ctx).Info(
"a killed signal is received",
zap.Uint32("signal", killed),
)
resp = &copResponse{err: derr.ErrQueryInterrupted}
ok = true
return
Expand Down Expand Up @@ -1862,8 +1867,15 @@ func (worker *copIteratorWorker) calculateRemain(ranges *KeyRanges, split *copro

// finished checks the flags and finished channel, it tells whether the worker is finished.
func (worker *copIteratorWorker) finished() bool {
if worker.vars != nil && worker.vars.Killed != nil && atomic.LoadUint32(worker.vars.Killed) == 1 {
return true
if worker.vars != nil && worker.vars.Killed != nil {
killed := atomic.LoadUint32(worker.vars.Killed)
if killed != 0 {
logutil.BgLogger().Info(
"a killed signal is received in copIteratorWorker",
zap.Uint32("signal", killed),
)
return true
}
}
select {
case <-worker.finishCh:
Expand Down

0 comments on commit a17c279

Please sign in to comment.