Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix race condition allowing multiple Tick() calls to a single Shard #409

Merged
merged 10 commits into from
Dec 13, 2017
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ LINUX_AMD64_ENV := GOOS=linux GOARCH=amd64 CGO_ENABLED=0
SERVICES := \
m3dbnode

TOOLS := \
read_ids \
read_index_ids \
clone_fileset \
dtest \
TOOLS := \
read_ids \
read_index_ids \
clone_fileset \
dtest \
verify_commitlogs \

.PHONY: setup
Expand Down Expand Up @@ -94,17 +94,21 @@ all: lint metalint test-ci-unit test-ci-integration services tools
@echo Made all successfully

.PHONY: install-license-bin
install-license-bin: install-vendor
install-license-bin:
@echo Installing node modules
git submodule update --init --recursive
[ -d $(license_node_modules) ] || (cd $(license_dir) && npm install)
[ -d $(license_node_modules) ] || ( \
git submodule update --init --recursive && \
cd $(license_dir) && npm install \
)

.PHONY: install-mockgen
install-mockgen: install-vendor
install-mockgen:
@echo Installing mockgen
rm -rf $(gopath_prefix)/$(mockgen_package) && \
cp -r $(vendor_prefix)/$(mockgen_package) $(gopath_prefix)/$(mockgen_package) && \
go install $(mockgen_package)
@which mockgen >/dev/null || (make install-vendor && \
rm -rf $(gopath_prefix)/$(mockgen_package) && \
cp -r $(vendor_prefix)/$(mockgen_package) $(gopath_prefix)/$(mockgen_package) && \
go install $(mockgen_package) \
)

.PHONY: install-thrift-bin
install-thrift-bin: install-vendor install-glide
Expand Down
6 changes: 3 additions & 3 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,9 @@ import:
- package: github.com/spf13/pflag
version: 4f9190456aed1c2113ca51ea9b89219747458dc1

- package: go.uber.org/atomic
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to pull this in anymore?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

doh I remember thinking ill forget to revert that :|

version: ^1.3.0

# NB(prateek): ideally, the following dependencies would be under testImport, but
# Glide doesn't like that. https://github.com/Masterminds/glide/issues/597

Expand Down
3 changes: 3 additions & 0 deletions storage/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ func (m *mediator) ongoingTick() {
err := m.Tick(asyncRun, noForce)
if err == errTickInProgress {
m.sleepFn(tickCheckInterval)
} else if err != nil {
log := m.opts.InstrumentOptions().Logger()
log.Errorf("error within ongoingTick: %v", err)
}
}
}
Expand Down
19 changes: 12 additions & 7 deletions storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,18 +384,19 @@ func (n *dbNamespace) closeShards(shards []databaseShard, blockUntilClosed bool)
}
}

func (n *dbNamespace) Tick(c context.Cancellable) {
func (n *dbNamespace) Tick(c context.Cancellable) error {
shards := n.GetOwnedShards()

if len(shards) == 0 {
return
return nil
}

// Tick through the shards sequentially to avoid parallel data flushes
var (
r tickResult
l sync.Mutex
wg sync.WaitGroup
r tickResult
multiErr xerrors.MultiError
l sync.Mutex
wg sync.WaitGroup
)
for _, shard := range shards {
shard := shard
Expand All @@ -406,18 +407,20 @@ func (n *dbNamespace) Tick(c context.Cancellable) {
if c.IsCancelled() {
return
}
shardResult := shard.Tick(c)

shardResult, err := shard.Tick(c)

l.Lock()
r = r.merge(shardResult)
multiErr = multiErr.Add(err)
l.Unlock()
})
}

wg.Wait()

if c.IsCancelled() {
return
return multiErr.FinalError()
}

n.statsLastTick.Lock()
Expand All @@ -435,6 +438,8 @@ func (n *dbNamespace) Tick(c context.Cancellable) {
n.metrics.tick.madeUnwiredBlocks.Inc(int64(r.madeUnwiredBlocks))
n.metrics.tick.mergedOutOfOrderBlocks.Inc(int64(r.mergedOutOfOrderBlocks))
n.metrics.tick.errors.Inc(int64(r.errors))

return multiErr.FinalError()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we want to emit stats or set the last tick stats unless err == nil.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixed up

}

func (n *dbNamespace) Write(
Expand Down
25 changes: 23 additions & 2 deletions storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,33 @@ func TestNamespaceTick(t *testing.T) {
ns := newTestNamespace(t)
for i := range testShardIDs {
shard := NewMockdatabaseShard(ctrl)
shard.EXPECT().Tick(context.NewNoOpCanncellable())
shard.EXPECT().Tick(context.NewNoOpCanncellable()).Return(tickResult{}, nil)
ns.shards[testShardIDs[i].ID()] = shard
}

// Only asserting the expected methods are called
ns.Tick(context.NewNoOpCanncellable())
require.NoError(t, ns.Tick(context.NewNoOpCanncellable()))
}

func TestNamespaceTickError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

fakeErr := errors.New("fake error")
ns := newTestNamespace(t)
for i := range testShardIDs {
shard := NewMockdatabaseShard(ctrl)
if i == 0 {
shard.EXPECT().Tick(context.NewNoOpCanncellable()).Return(tickResult{}, fakeErr)
} else {
shard.EXPECT().Tick(context.NewNoOpCanncellable()).Return(tickResult{}, nil)
}
ns.shards[testShardIDs[i].ID()] = shard
}

err := ns.Tick(context.NewNoOpCanncellable())
require.NotNil(t, err)
require.Equal(t, fakeErr.Error(), err.Error())
}

func TestNamespaceWriteShardNotOwned(t *testing.T) {
Expand Down
85 changes: 72 additions & 13 deletions storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import (
xerrors "github.com/m3db/m3x/errors"
xtime "github.com/m3db/m3x/time"

uatomic "github.com/uber-go/atomic"
"github.com/uber-go/tally"
)

Expand All @@ -56,8 +57,10 @@ const (
)

var (
errShardEntryNotFound = errors.New("shard entry not found")
errShardNotOpen = errors.New("shard is not open")
errShardEntryNotFound = errors.New("shard entry not found")
errShardNotOpen = errors.New("shard is not open")
errShardAlreadyTicking = errors.New("shard is already ticking")
errShardClosingTickTerminated = errors.New("shard is closing, terminating tick")
)

type filesetBeforeFn func(filePathPrefix string, namespace ts.ID, shardID uint32, t time.Time) ([]string, error)
Expand All @@ -66,7 +69,7 @@ type tickPolicy int

const (
tickPolicyRegular tickPolicy = iota
tickPolicyForceExpiry
tickPolicyCloseShard
)

type dbShardState int
Expand Down Expand Up @@ -100,6 +103,8 @@ type dbShard struct {
identifierPool ts.IdentifierPool
contextPool context.Pool
flushState shardFlushState
ticking *uatomic.Bool
tickWg *sync.WaitGroup
runtimeOptsListenClosers []xclose.SimpleCloser
currRuntimeOptions dbShardRuntimeOptions
metrics dbShardMetrics
Expand Down Expand Up @@ -206,6 +211,8 @@ func newDatabaseShard(
identifierPool: opts.IdentifierPool(),
contextPool: opts.ContextPool(),
flushState: newShardFlushState(),
ticking: uatomic.NewBool(false),
tickWg: &sync.WaitGroup{},
metrics: newDatabaseShardMetrics(scope),
}
d.insertQueue = newDatabaseShardInsertQueue(d.insertSeriesBatch,
Expand Down Expand Up @@ -360,30 +367,74 @@ func (s *dbShard) Close() error {
stopwatch.Stop()
}()

// NB(prateek): wait till any existing ticks are finished. In the usual
// case, no other ticks are running, and tickWg count is at 0, so the
// call to Wait() will return immediately.
// In the case when there is an existing Tick running, the count for
// tickWg will be > 0, and we'll wait until it's reset to zero, which
// will happen because earlier in this function we set the shard state
// to dbShardStateClosing, which triggers an early termination of
// any active ticks.
s.tickWg.Wait()

// NB(r): Asynchronously we purge expired series to ensure pressure on the
// GC is not placed all at one time. If the deadline is too low and still
// causes the GC to impact performance when closing shards the deadline
// should be increased.
cancellable := context.NewNoOpCanncellable()
s.tickAndExpire(cancellable, tickPolicyForceExpiry)
_, err := s.tickAndExpire(cancellable, tickPolicyCloseShard)
return err
}

return nil
func (s *dbShard) isClosing() bool {
s.RLock()
state := s.state
s.RUnlock()
return state == dbShardStateClosing
}

func (s *dbShard) Tick(c context.Cancellable) tickResult {
func (s *dbShard) Tick(c context.Cancellable) (tickResult, error) {
s.removeAnyFlushStatesTooEarly()
return s.tickAndExpire(c, tickPolicyRegular)
}

func (s *dbShard) tickAndExpire(
c context.Cancellable,
policy tickPolicy,
) tickResult {
) (tickResult, error) {
terminateDueToClosing := func() bool {
// NB(prateek): we bail out early if the shard is closing,
// unless it's the final tick issued during the Close(). This
// final tick is required to release resources back to our pools.
return policy != tickPolicyCloseShard && s.isClosing()
}

if terminateDueToClosing() {
return tickResult{}, errShardClosingTickTerminated
}

// ensure only one tick can execute at a time
if s.ticking.Swap(true) {
// i.e. we were previously ticking
return tickResult{}, errShardAlreadyTicking
}
Copy link
Collaborator

@robskillington robskillington Dec 13, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the tickPolicy is tickPolicyRegular and s.isClosing() can we also return err? i.e. don't let regular ticks start if closing.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure thing

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done


// enable Close() to track the lifecycle of the tick
s.tickWg.Add(1)

defer func() {
// reset ticking state
s.ticking.Store(false)
// indicate we're done ticking
s.tickWg.Done()
}()

var (
r tickResult
expired []series.DatabaseSeries
i int
slept time.Duration
r tickResult
expired []series.DatabaseSeries
terminatedTickingDueToClosing bool
i int
slept time.Duration
)
s.RLock()
tickSleepBatch := s.currRuntimeOptions.tickSleepSeriesBatchSize
Expand All @@ -397,6 +448,10 @@ func (s *dbShard) tickAndExpire(
if c.IsCancelled() {
return false
}
if terminateDueToClosing() {
terminatedTickingDueToClosing = true
return false
}
// Throttle the tick
sleepFor := time.Duration(tickSleepBatch) * tickSleepPerSeries
s.sleepFn(sleepFor)
Expand All @@ -409,7 +464,7 @@ func (s *dbShard) tickAndExpire(
switch policy {
case tickPolicyRegular:
result, err = entry.series.Tick()
case tickPolicyForceExpiry:
case tickPolicyCloseShard:
err = series.ErrSeriesAllDatapointsExpired
}
if err == series.ErrSeriesAllDatapointsExpired {
Expand Down Expand Up @@ -449,7 +504,11 @@ func (s *dbShard) tickAndExpire(
s.purgeExpiredSeries(expired)
}

return r
if terminatedTickingDueToClosing {
return tickResult{}, errShardClosingTickTerminated
}

return r, nil
}

func (s *dbShard) purgeExpiredSeries(expired []series.DatabaseSeries) {
Expand Down
Loading