Skip to content

Commit

Permalink
Fix race condition allowing multiple Tick() calls to a single Shard (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
prateek authored Dec 13, 2017
1 parent 0e5c270 commit d21dce5
Show file tree
Hide file tree
Showing 10 changed files with 368 additions and 83 deletions.
28 changes: 16 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -39,11 +39,11 @@ LINUX_AMD64_ENV := GOOS=linux GOARCH=amd64 CGO_ENABLED=0
SERVICES := \
m3dbnode

TOOLS := \
read_ids \
read_index_ids \
clone_fileset \
dtest \
TOOLS := \
read_ids \
read_index_ids \
clone_fileset \
dtest \
verify_commitlogs \

.PHONY: setup
Expand Down Expand Up @@ -94,17 +94,21 @@ all: lint metalint test-ci-unit test-ci-integration services tools
@echo Made all successfully

.PHONY: install-license-bin
install-license-bin: install-vendor
install-license-bin:
@echo Installing node modules
git submodule update --init --recursive
[ -d $(license_node_modules) ] || (cd $(license_dir) && npm install)
[ -d $(license_node_modules) ] || ( \
git submodule update --init --recursive && \
cd $(license_dir) && npm install \
)

.PHONY: install-mockgen
install-mockgen: install-vendor
install-mockgen:
@echo Installing mockgen
rm -rf $(gopath_prefix)/$(mockgen_package) && \
cp -r $(vendor_prefix)/$(mockgen_package) $(gopath_prefix)/$(mockgen_package) && \
go install $(mockgen_package)
@which mockgen >/dev/null || (make install-vendor && \
rm -rf $(gopath_prefix)/$(mockgen_package) && \
cp -r $(vendor_prefix)/$(mockgen_package) $(gopath_prefix)/$(mockgen_package) && \
go install $(mockgen_package) \
)

.PHONY: install-thrift-bin
install-thrift-bin: install-vendor install-glide
Expand Down
3 changes: 3 additions & 0 deletions storage/mediator.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,9 @@ func (m *mediator) ongoingTick() {
err := m.Tick(asyncRun, noForce)
if err == errTickInProgress {
m.sleepFn(tickCheckInterval)
} else if err != nil {
log := m.opts.InstrumentOptions().Logger()
log.Errorf("error within ongoingTick: %v", err)
}
}
}
Expand Down
23 changes: 15 additions & 8 deletions storage/namespace.go
Original file line number Diff line number Diff line change
Expand Up @@ -384,18 +384,19 @@ func (n *dbNamespace) closeShards(shards []databaseShard, blockUntilClosed bool)
}
}

func (n *dbNamespace) Tick(c context.Cancellable) {
func (n *dbNamespace) Tick(c context.Cancellable) error {
shards := n.GetOwnedShards()

if len(shards) == 0 {
return
return nil
}

// Tick through the shards sequentially to avoid parallel data flushes
var (
r tickResult
l sync.Mutex
wg sync.WaitGroup
r tickResult
multiErr xerrors.MultiError
l sync.Mutex
wg sync.WaitGroup
)
for _, shard := range shards {
shard := shard
Expand All @@ -406,18 +407,22 @@ func (n *dbNamespace) Tick(c context.Cancellable) {
if c.IsCancelled() {
return
}
shardResult := shard.Tick(c)

shardResult, err := shard.Tick(c)

l.Lock()
r = r.merge(shardResult)
multiErr = multiErr.Add(err)
l.Unlock()
})
}

wg.Wait()

if c.IsCancelled() {
return
// NB: we early terminate here to ensure we are not reporting metrics
// based on in-accurate/partial tick results.
if err := multiErr.FinalError(); err != nil || c.IsCancelled() {
return err
}

n.statsLastTick.Lock()
Expand All @@ -435,6 +440,8 @@ func (n *dbNamespace) Tick(c context.Cancellable) {
n.metrics.tick.madeUnwiredBlocks.Inc(int64(r.madeUnwiredBlocks))
n.metrics.tick.mergedOutOfOrderBlocks.Inc(int64(r.mergedOutOfOrderBlocks))
n.metrics.tick.errors.Inc(int64(r.errors))

return nil
}

func (n *dbNamespace) Write(
Expand Down
25 changes: 23 additions & 2 deletions storage/namespace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,33 @@ func TestNamespaceTick(t *testing.T) {
ns := newTestNamespace(t)
for i := range testShardIDs {
shard := NewMockdatabaseShard(ctrl)
shard.EXPECT().Tick(context.NewNoOpCanncellable())
shard.EXPECT().Tick(context.NewNoOpCanncellable()).Return(tickResult{}, nil)
ns.shards[testShardIDs[i].ID()] = shard
}

// Only asserting the expected methods are called
ns.Tick(context.NewNoOpCanncellable())
require.NoError(t, ns.Tick(context.NewNoOpCanncellable()))
}

func TestNamespaceTickError(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

fakeErr := errors.New("fake error")
ns := newTestNamespace(t)
for i := range testShardIDs {
shard := NewMockdatabaseShard(ctrl)
if i == 0 {
shard.EXPECT().Tick(context.NewNoOpCanncellable()).Return(tickResult{}, fakeErr)
} else {
shard.EXPECT().Tick(context.NewNoOpCanncellable()).Return(tickResult{}, nil)
}
ns.shards[testShardIDs[i].ID()] = shard
}

err := ns.Tick(context.NewNoOpCanncellable())
require.NotNil(t, err)
require.Equal(t, fakeErr.Error(), err.Error())
}

func TestNamespaceWriteShardNotOwned(t *testing.T) {
Expand Down
92 changes: 79 additions & 13 deletions storage/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,8 +56,10 @@ const (
)

var (
errShardEntryNotFound = errors.New("shard entry not found")
errShardNotOpen = errors.New("shard is not open")
errShardEntryNotFound = errors.New("shard entry not found")
errShardNotOpen = errors.New("shard is not open")
errShardAlreadyTicking = errors.New("shard is already ticking")
errShardClosingTickTerminated = errors.New("shard is closing, terminating tick")
)

type filesetBeforeFn func(filePathPrefix string, namespace ts.ID, shardID uint32, t time.Time) ([]string, error)
Expand All @@ -66,7 +68,7 @@ type tickPolicy int

const (
tickPolicyRegular tickPolicy = iota
tickPolicyForceExpiry
tickPolicyCloseShard
)

type dbShardState int
Expand Down Expand Up @@ -100,6 +102,8 @@ type dbShard struct {
identifierPool ts.IdentifierPool
contextPool context.Pool
flushState shardFlushState
ticking bool
tickWg *sync.WaitGroup
runtimeOptsListenClosers []xclose.SimpleCloser
currRuntimeOptions dbShardRuntimeOptions
metrics dbShardMetrics
Expand Down Expand Up @@ -206,6 +210,7 @@ func newDatabaseShard(
identifierPool: opts.IdentifierPool(),
contextPool: opts.ContextPool(),
flushState: newShardFlushState(),
tickWg: &sync.WaitGroup{},
metrics: newDatabaseShardMetrics(scope),
}
d.insertQueue = newDatabaseShardInsertQueue(d.insertSeriesBatch,
Expand Down Expand Up @@ -360,30 +365,80 @@ func (s *dbShard) Close() error {
stopwatch.Stop()
}()

// NB(prateek): wait till any existing ticks are finished. In the usual
// case, no other ticks are running, and tickWg count is at 0, so the
// call to Wait() will return immediately.
// In the case when there is an existing Tick running, the count for
// tickWg will be > 0, and we'll wait until it's reset to zero, which
// will happen because earlier in this function we set the shard state
// to dbShardStateClosing, which triggers an early termination of
// any active ticks.
s.tickWg.Wait()

// NB(r): Asynchronously we purge expired series to ensure pressure on the
// GC is not placed all at one time. If the deadline is too low and still
// causes the GC to impact performance when closing shards the deadline
// should be increased.
cancellable := context.NewNoOpCanncellable()
s.tickAndExpire(cancellable, tickPolicyForceExpiry)
_, err := s.tickAndExpire(cancellable, tickPolicyCloseShard)
return err
}

return nil
func (s *dbShard) isClosing() bool {
s.RLock()
closing := s.isClosingWithLock()
s.RUnlock()
return closing
}

func (s *dbShard) Tick(c context.Cancellable) tickResult {
func (s *dbShard) isClosingWithLock() bool {
return s.state == dbShardStateClosing
}

func (s *dbShard) Tick(c context.Cancellable) (tickResult, error) {
s.removeAnyFlushStatesTooEarly()
return s.tickAndExpire(c, tickPolicyRegular)
}

func (s *dbShard) tickAndExpire(
c context.Cancellable,
policy tickPolicy,
) tickResult {
) (tickResult, error) {
s.Lock()
// ensure only one tick can execute at a time
if s.ticking {
s.Unlock()
// i.e. we were previously ticking
return tickResult{}, errShardAlreadyTicking
}

// NB(prateek): we bail out early if the shard is closing,
// unless it's the final tick issued during the Close(). This
// final tick is required to release resources back to our pools.
if policy != tickPolicyCloseShard && s.isClosingWithLock() {
s.Unlock()
return tickResult{}, errShardClosingTickTerminated
}

// enable Close() to track the lifecycle of the tick
s.ticking = true
s.tickWg.Add(1)
s.Unlock()

// reset ticking state
defer func() {
s.Lock()
s.ticking = false
s.tickWg.Done()
s.Unlock()
}()

var (
r tickResult
expired []series.DatabaseSeries
i int
slept time.Duration
r tickResult
expired []series.DatabaseSeries
terminatedTickingDueToClosing bool
i int
slept time.Duration
)
s.RLock()
tickSleepBatch := s.currRuntimeOptions.tickSleepSeriesBatchSize
Expand All @@ -397,6 +452,13 @@ func (s *dbShard) tickAndExpire(
if c.IsCancelled() {
return false
}
// NB(prateek): Also bail out early if the shard is closing,
// unless it's the final tick issued during the Close(). This
// final tick is required to release resources back to our pools.
if policy != tickPolicyCloseShard && s.isClosing() {
terminatedTickingDueToClosing = true
return false
}
// Throttle the tick
sleepFor := time.Duration(tickSleepBatch) * tickSleepPerSeries
s.sleepFn(sleepFor)
Expand All @@ -409,7 +471,7 @@ func (s *dbShard) tickAndExpire(
switch policy {
case tickPolicyRegular:
result, err = entry.series.Tick()
case tickPolicyForceExpiry:
case tickPolicyCloseShard:
err = series.ErrSeriesAllDatapointsExpired
}
if err == series.ErrSeriesAllDatapointsExpired {
Expand Down Expand Up @@ -449,7 +511,11 @@ func (s *dbShard) tickAndExpire(
s.purgeExpiredSeries(expired)
}

return r
if terminatedTickingDueToClosing {
return tickResult{}, errShardClosingTickTerminated
}

return r, nil
}

func (s *dbShard) purgeExpiredSeries(expired []series.DatabaseSeries) {
Expand Down
Loading

0 comments on commit d21dce5

Please sign in to comment.