Skip to content

Commit

Permalink
Merge branch 'master' into fix/check-error
Browse files Browse the repository at this point in the history
  • Loading branch information
asalem1 authored Oct 24, 2019
2 parents 7a68f10 + 45944a4 commit 50e3f14
Show file tree
Hide file tree
Showing 13 changed files with 445 additions and 16 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
1. [15510](https://github.com/influxdata/influxdb/pull/15510): UI/Telegraf sort functionality fixed
1. [15549](https://github.com/influxdata/influxdb/pull/15549): UI/Task edit functionality fixed
1. [15556](https://github.com/influxdata/influxdb/pull/15556): Creating a check now displays on the checklist
1. [15559](https://github.com/influxdata/influxdb/pull/15559): Exiting a configuration of a dashboard cell now properly renders the cell content

## v2.0.0-alpha.18 [2019-09-26]

Expand Down
9 changes: 9 additions & 0 deletions query/stdlib/influxdata/influxdb/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,15 @@ func rewritePushableExpr(e semantic.Expression) (semantic.Expression, bool) {
e.Left, e.Right = left, right
return e, true
}

case *semantic.LogicalExpression:
left, lok := rewritePushableExpr(e.Left)
right, rok := rewritePushableExpr(e.Right)
if lok || rok {
e = e.Copy().(*semantic.LogicalExpression)
e.Left, e.Right = left, right
return e, true
}
}
return e, false
}
Expand Down
67 changes: 67 additions & 0 deletions query/stdlib/influxdata/influxdb/rules_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,6 +581,73 @@ func TestPushDownFilterRule(t *testing.T) {
},
NoChange: true,
},
{
Name: `r._measurement == "cpu" and exists r.host`,
Rules: []plan.Rule{influxdb.PushDownFilterRule{}},
Before: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("ReadRange", &influxdb.ReadRangePhysSpec{
Bounds: bounds,
}),
plan.CreatePhysicalNode("filter", &universe.FilterProcedureSpec{
Fn: makeResolvedFilterFn(&semantic.LogicalExpression{
Operator: ast.AndOperator,
Left: &semantic.BinaryExpression{
Operator: ast.EqualOperator,
Left: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{Name: "r"},
Property: "host",
},
Right: &semantic.StringLiteral{
Value: "cpu",
},
},
Right: &semantic.UnaryExpression{
Operator: ast.ExistsOperator,
Argument: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{Name: "r"},
Property: "host",
},
},
}),
}),
},
Edges: [][2]int{
{0, 1},
},
},
After: &plantest.PlanSpec{
Nodes: []plan.Node{
plan.CreatePhysicalNode("merged_ReadRange_filter", &influxdb.ReadRangePhysSpec{
Bounds: bounds,
FilterSet: true,
Filter: makeFilterFn(&semantic.LogicalExpression{
Operator: ast.AndOperator,
Left: &semantic.BinaryExpression{
Operator: ast.EqualOperator,
Left: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{Name: "r"},
Property: "host",
},
Right: &semantic.StringLiteral{
Value: "cpu",
},
},
Right: &semantic.BinaryExpression{
Operator: ast.NotEqualOperator,
Left: &semantic.MemberExpression{
Object: &semantic.IdentifierExpression{Name: "r"},
Property: "host",
},
Right: &semantic.StringLiteral{
Value: "",
},
},
}),
}),
},
},
},
}

for _, tc := range tests {
Expand Down
56 changes: 56 additions & 0 deletions semaphore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package influxdb

import (
"context"
"errors"
"time"
)

// ErrNoAcquire is returned when it was not possible to acquire ownership of the
// semaphore.
var ErrNoAcquire = errors.New("ownership not acquired")

// DefaultLeaseTTL is used when a specific lease TTL is not requested.
const DefaultLeaseTTL = time.Minute

// A Semaphore provides an API for requesting ownership of an expirable semaphore.
//
// Acquired semaphores have an expiration. If they're not released or kept alive
// during this period then they will expire and ownership of the semaphore will
// be lost.
//
// TODO(edd): add AcquireTTL when needed. It should block.
type Semaphore interface {
// TryAcquire attempts to acquire ownership of the semaphore. TryAcquire
// must not block. Failure to get ownership of the semaphore should be
// signalled to the caller via the return of the ErrNoAcquire error.
TryAcquire(ctx context.Context, ttl time.Duration) (Lease, error)
}

// A Lease represents ownership over a semaphore. It gives the owner the ability
// to extend ownership over the semaphore or release ownership of the semaphore.
type Lease interface {
// TTL returns the duration of time remaining before the lease expires.
TTL(context.Context) (time.Duration, error)

// Release terminates ownership of the semaphore by revoking the lease.
Release(context.Context) error

// KeepAlive extends the lease back to the original TTL.
KeepAlive(context.Context) error
}

// NopSemaphore is a Semaphore that always hands out leases.
var NopSemaphore Semaphore = nopSemaphore{}

type nopSemaphore struct{}

func (nopSemaphore) TryAcquire(ctx context.Context, ttl time.Duration) (Lease, error) {
return nopLease{}, nil
}

type nopLease struct{}

func (nopLease) TTL(context.Context) (time.Duration, error) { return DefaultLeaseTTL, nil }
func (nopLease) Release(context.Context) error { return nil }
func (nopLease) KeepAlive(context.Context) error { return nil }
9 changes: 9 additions & 0 deletions storage/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"sync"
"time"

"github.com/influxdata/influxdb"
platform "github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/logger"
Expand Down Expand Up @@ -141,6 +142,14 @@ func WithCompactionLimiter(limiter limiter.Fixed) Option {
}
}

// WithCompactionSemaphore sets the semaphore used to coordinate full compactions
// across multiple storage engines.
func WithCompactionSemaphore(s influxdb.Semaphore) Option {
return func(e *Engine) {
e.engine.SetSemaphore(s)
}
}

// NewEngine initialises a new storage engine, including a series file, index and
// TSM engine.
func NewEngine(path string, c Config, options ...Option) *Engine {
Expand Down
2 changes: 1 addition & 1 deletion tsdb/series_partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (p *SeriesPartition) Open() error {

if err := p.index.Open(); err != nil {
return err
} else if p.index.Recover(p.segments); err != nil {
} else if err = p.index.Recover(p.segments); err != nil {
return err
}
return nil
Expand Down
69 changes: 69 additions & 0 deletions tsdb/tsm1/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync/atomic"
"time"

"github.com/influxdata/influxdb"
"github.com/influxdata/influxdb/kit/tracing"
"github.com/influxdata/influxdb/logger"
"github.com/influxdata/influxdb/models"
Expand Down Expand Up @@ -160,6 +161,10 @@ type Engine struct {

// Limiter for concurrent compactions.
compactionLimiter limiter.Fixed
// A semaphore for limiting full compactions across multiple engines.
fullCompactionSemaphore influxdb.Semaphore
// Tracks how long the last full compaction took. Should be accessed atomically.
lastFullCompactionDuration int64

scheduler *scheduler
snapshotter Snapshotter
Expand Down Expand Up @@ -220,6 +225,7 @@ func NewEngine(path string, idx *tsi1.Index, config Config, options ...EngineOpt
enableCompactionsOnOpen: true,
formatFileName: DefaultFormatFileName,
compactionLimiter: limiter.NewFixed(maxCompactions),
fullCompactionSemaphore: influxdb.NopSemaphore,
scheduler: newScheduler(maxCompactions),
snapshotter: new(noSnapshotter),
}
Expand All @@ -231,6 +237,12 @@ func NewEngine(path string, idx *tsi1.Index, config Config, options ...EngineOpt
return e
}

// SetSemaphore sets the semaphore used to coordinate full compactions across
// multiple engines.
func (e *Engine) SetSemaphore(s influxdb.Semaphore) {
e.fullCompactionSemaphore = s
}

// WithCompactionLimiter sets the compaction limiter, which is used to limit the
// number of concurrent compactions.
func (e *Engine) WithCompactionLimiter(limiter limiter.Fixed) {
Expand Down Expand Up @@ -1104,21 +1116,78 @@ func (e *Engine) compactFull(ctx context.Context, grp CompactionGroup, wg *sync.

// Try the lo priority limiter, otherwise steal a little from the high priority if we can.
if e.compactionLimiter.TryTake() {
// Attempt to get ownership of the semaphore for this engine. If the
// default semaphore is in use then ownership will always be granted.
ttl := influxdb.DefaultLeaseTTL
lastCompaction := time.Duration(atomic.LoadInt64(&e.lastFullCompactionDuration))
if lastCompaction > ttl {
ttl = lastCompaction // If the last full compaction took > default ttl then set a new TTL
}

lease, err := e.fullCompactionSemaphore.TryAcquire(ctx, ttl)
if err == influxdb.ErrNoAcquire {
e.logger.Info("Cannot acquire semaphore ownership to carry out full compaction", zap.Duration("semaphore_requested_ttl", ttl))
e.compactionLimiter.Release()
return false
} else if err != nil {
e.logger.Warn("Failed to execute full compaction", zap.Error(err), zap.Duration("semaphore_requested_ttl", ttl))
e.compactionLimiter.Release()
return false
} else if e.fullCompactionSemaphore != influxdb.NopSemaphore {
e.logger.Info("Acquired semaphore ownership for full compaction", zap.Duration("semaphore_requested_ttl", ttl))
}

ctx, cancel := context.WithCancel(ctx)
go e.keepLeaseAlive(ctx, lease) // context cancelled when compaction finished.

e.compactionTracker.IncFullActive()
wg.Add(1)
go func() {
defer wg.Done()
defer e.compactionTracker.DecFullActive()
defer e.compactionLimiter.Release()

now := time.Now() // Track how long compaction takes
s.Apply(ctx)
atomic.StoreInt64(&e.lastFullCompactionDuration, int64(time.Since(now)))

// Release the files in the compaction plan
e.CompactionPlan.Release([]CompactionGroup{s.group})
cancel()
}()
return true
}
return false
}

// keepLeaseAlive blocks, keeping a lease alive until the context is cancelled.
func (e *Engine) keepLeaseAlive(ctx context.Context, lease influxdb.Lease) {
ttl, err := lease.TTL(ctx)
if err != nil {
e.logger.Warn("unable to get TTL for lease on semaphore", zap.Error(err))
ttl = influxdb.DefaultLeaseTTL // This is probably a reasonable fallback.
}

// Renew the lease when ttl is halved
ticker := time.NewTicker(ttl / 2)
for {
select {
case <-ctx.Done():
ticker.Stop()
if err := lease.Release(ctx); err != nil {
e.logger.Warn("Lease on sempahore was not released", zap.Error(err))
}
return
case <-ticker.C:
if err := lease.KeepAlive(ctx); err != nil {
e.logger.Warn("Unable to extend lease", zap.Error(err))
} else {
e.logger.Info("Extended lease on semaphore")
}
}
}
}

// compactionStrategy holds the details of what to do in a compaction.
type compactionStrategy struct {
group CompactionGroup
Expand Down
3 changes: 3 additions & 0 deletions ui/jest.config.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ module.exports = {
globals: {
'ts-jest': {
tsConfig: 'tsconfig.test.json',
diagnostics: {
ignoreCodes: [6133] // ignore `'foo' is declared but its value is never read.`
},
},
},
collectCoverage: true,
Expand Down
Loading

0 comments on commit 50e3f14

Please sign in to comment.