Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add initial WAL implementation and tests #3569

Merged
merged 20 commits into from
Aug 19, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 154 additions & 0 deletions cmd/influx_stress/influx_stress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
package main

import (
"flag"
"fmt"
"math/rand"
"net/url"
"runtime"
"sort"
"sync"
"time"

"github.com/influxdb/influxdb/client"
)

var (
batchSize = flag.Int("batchsize", 5000, "number of points per batch")
seriesCount = flag.Int("series", 100000, "number of unique series to create")
pointCount = flag.Int("points", 100, "number of points per series to create")
concurrency = flag.Int("concurrency", 10, "number of simultaneous writes to run")
batchInterval = flag.Duration("batchinterval", 0*time.Second, "duration between batches")
database = flag.String("database", "stress", "name of database")
address = flag.String("addr", "localhost:8086", "IP address and port of database (e.g., localhost:8086)")
)

func main() {
flag.Parse()
runtime.GOMAXPROCS(runtime.NumCPU())

startTime := time.Now()
counter := NewConcurrencyLimiter(*concurrency)

u, _ := url.Parse(fmt.Sprintf("http://%s", *address))
c, err := client.NewClient(client.Config{URL: *u})
if err != nil {
panic(err)
}

var mu sync.Mutex
var wg sync.WaitGroup
responseTimes := make([]int, 0)

totalPoints := 0

for i := 1; i <= *pointCount; i++ {
batch := &client.BatchPoints{
Database: *database,
WriteConsistency: "any",
Time: time.Now(),
Precision: "n",
}

for j := 1; j <= *seriesCount; j++ {
p := client.Point{
Measurement: "cpu",
Tags: map[string]string{"region": "uswest", "host": fmt.Sprintf("host-%d", j)},
Fields: map[string]interface{}{"value": rand.Float64()},
}
batch.Points = append(batch.Points, p)
if len(batch.Points) >= *batchSize {
wg.Add(1)
counter.Increment()
totalPoints += len(batch.Points)
go func(b *client.BatchPoints, total int) {
st := time.Now()
if _, err := c.Write(*b); err != nil {
fmt.Println("ERROR: ", err.Error())
} else {
mu.Lock()
responseTimes = append(responseTimes, int(time.Since(st).Nanoseconds()))
mu.Unlock()
}
wg.Done()
counter.Decrement()
if total%1000000 == 0 {
fmt.Printf("%d total points. %d in %s\n", total, *batchSize, time.Since(st))
}
}(batch, totalPoints)

batch = &client.BatchPoints{
Database: *database,
WriteConsistency: "any",
Precision: "n",
}
}
}
}

wg.Wait()
sort.Sort(sort.Reverse(sort.IntSlice(responseTimes)))

total := int64(0)
for _, t := range responseTimes {
total += int64(t)
}
mean := total / int64(len(responseTimes))

fmt.Printf("Wrote %d points at average rate of %.0f\n", totalPoints, float64(totalPoints)/time.Since(startTime).Seconds())
fmt.Println("Average response time: ", time.Duration(mean))
fmt.Println("Slowest response times:")
for _, r := range responseTimes[:100] {
fmt.Println(time.Duration(r))
}
}

// ConcurrencyLimiter is a go routine safe struct that can be used to
// ensure that no more than a specifid max number of goroutines are
// executing.
type ConcurrencyLimiter struct {
inc chan chan struct{}
dec chan struct{}
max int
count int
}

// NewConcurrencyLimiter returns a configured limiter that will
// ensure that calls to Increment will block if the max is hit.
func NewConcurrencyLimiter(max int) *ConcurrencyLimiter {
c := &ConcurrencyLimiter{
inc: make(chan chan struct{}),
dec: make(chan struct{}, max),
max: max,
}
go c.handleLimits()
return c
}

// Increment will increase the count of running goroutines by 1.
// if the number is currently at the max, the call to Increment
// will block until another goroutine decrements.
func (c *ConcurrencyLimiter) Increment() {
r := make(chan struct{})
c.inc <- r
<-r
}

// Decrement will reduce the count of running goroutines by 1
func (c *ConcurrencyLimiter) Decrement() {
c.dec <- struct{}{}
}

// handleLimits runs in a goroutine to manage the count of
// running goroutines.
func (c *ConcurrencyLimiter) handleLimits() {
for {
r := <-c.inc
if c.count >= c.max {
<-c.dec
c.count--
}
c.count++
r <- struct{}{}
}
}
5 changes: 4 additions & 1 deletion cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type Server struct {
// NewServer returns a new instance of Server built from a config.
func NewServer(c *Config, version string) (*Server, error) {
// Construct base meta store and data store.
tsdbStore := tsdb.NewStore(c.Data.Dir)
tsdbStore.EngineOptions.Config = c.Data

s := &Server{
version: version,
err: make(chan error),
Expand All @@ -77,7 +80,7 @@ func NewServer(c *Config, version string) (*Server, error) {
BindAddress: c.Meta.BindAddress,

MetaStore: meta.NewStore(c.Meta),
TSDBStore: tsdb.NewStore(c.Data.Dir),
TSDBStore: tsdbStore,

reportingDisabled: c.ReportingDisabled,
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/influxd/run/server_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewServer(c *run.Config) *Server {
Server: srv,
Config: c,
}
s.TSDBStore.EngineOptions.Config = c.Data
configureLogging(&s)
return &s
}
Expand Down Expand Up @@ -155,6 +156,7 @@ func NewConfig() *run.Config {
c.Meta.CommitTimeout = toml.Duration(5 * time.Millisecond)

c.Data.Dir = MustTempFile()
c.Data.WALDir = MustTempFile()

c.HintedHandoff.Dir = MustTempFile()

Expand Down
27 changes: 27 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,37 @@ reporting-disabled = false

[data]
dir = "/var/opt/influxdb/data"

# The following WAL settings are for the b1 storage engine used in 0.9.2. They won't
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about the bz1 engine?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, bz1 didn't exist in 0.9.2.....

# apply to any new shards created after upgrading to a version > 0.9.3.
max-wal-size = 104857600 # Maximum size the WAL can reach before a flush. Defaults to 100MB.
wal-flush-interval = "10m" # Maximum time data can sit in WAL before a flush.
wal-partition-flush-delay = "2s" # The delay time between each WAL partition being flushed.

# These are the WAL settings for the storage engine >= 0.9.3
wal-dir = "/var/opt/influxdb/wal"
wal-enable-logging = true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just want to check that you want this logging on by default (that it is not too verbose).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, right now the only thing that gets logged is when a compaction is run, which is infrequent.


# When a series in the WAL in-memory cache reaches this size in bytes it is marked as ready to
# flush to the index
# wal-ready-series-size = 25600

# Flush and compact a partition once this ratio of series are over the ready size
# wal-compaction-threshold = 0.6

# Force a flush and compaction if any series in a partition gets above this size in bytes
# wal-max-series-size = 2097152

# Force a flush of all series and full compaction if there have been no writes in this
# amount of time. This is useful for ensuring that shards that are cold for writes don't
# keep a bunch of data cached in memory and in the WAL.
# wal-flush-cold-interval = "10m"

# Force a partition to flush its largest series if it reaches this approximate size in
# bytes. Remember there are 5 partitions so you'll need at least 5x this amount of memory.
# The more memory you have, the bigger this can be.
# wal-partition-size-threshold = 20971520

###
### [cluster]
###
Expand Down
44 changes: 43 additions & 1 deletion tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,61 @@ const (

// DefaultWALPartitionFlushDelay is the sleep time between WAL partition flushes.
DefaultWALPartitionFlushDelay = 2 * time.Second

// tsdb/engine/wal configuration options

// DefaultReadySeriesSize of 32KB specifies when a series is eligible to be flushed
DefaultReadySeriesSize = 30 * 1024

// DefaultCompactionThreshold flush and compact a partition once this ratio of keys are over the flush size
DefaultCompactionThreshold = 0.5

// DefaultMaxSeriesSize specifies the size at which a series will be forced to flush
DefaultMaxSeriesSize = 1024 * 1024

// DefaultFlushColdInterval specifies how long after a partition has been cold
// for writes that a full flush and compaction are forced
DefaultFlushColdInterval = 5 * time.Minute

// DefaultParititionSizeThreshold specifies when a partition gets to this size in
// memory, we should slow down writes until it gets a chance to compact.
// This will force clients to get backpressure if they're writing too fast. We need
// this because the WAL can take writes much faster than the index. So eventually
// we'll need to create backpressure, otherwise we'll fill up the memory and die.
// This number multiplied by the parition count is roughly the max possible memory
// size for the in-memory WAL cache.
DefaultPartitionSizeThreshold = 20 * 1024 * 1024 // 20MB
)

type Config struct {
Dir string `toml:"dir"`
Dir string `toml:"dir"`

// WAL config options for b1 (introduced in 0.9.2)
MaxWALSize int `toml:"max-wal-size"`
WALFlushInterval toml.Duration `toml:"wal-flush-interval"`
WALPartitionFlushDelay toml.Duration `toml:"wal-partition-flush-delay"`

// WAL configuration options for bz1 (introduced in 0.9.3)
WALDir string `toml:"wal-dir"`
WALEnableLogging bool `toml:"wal-enable-logging"`
WALReadySeriesSize int `toml:"wal-ready-series-size"`
WALCompactionThreshold float64 `toml:"wal-compaction-threshold"`
WALMaxSeriesSize int `toml:"wal-max-series-size"`
WALFlushColdInterval toml.Duration `toml:"wal-flush-cold-interval"`
WALPartitionSizeThreshold uint64 `toml:"wal-partition-size-threshold"`
}

func NewConfig() Config {
return Config{
MaxWALSize: DefaultMaxWALSize,
WALFlushInterval: toml.Duration(DefaultWALFlushInterval),
WALPartitionFlushDelay: toml.Duration(DefaultWALPartitionFlushDelay),

WALEnableLogging: true,
WALReadySeriesSize: DefaultReadySeriesSize,
WALCompactionThreshold: DefaultCompactionThreshold,
WALMaxSeriesSize: DefaultMaxSeriesSize,
WALFlushColdInterval: toml.Duration(DefaultFlushColdInterval),
WALPartitionSizeThreshold: DefaultPartitionSizeThreshold,
}
}
37 changes: 35 additions & 2 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package tsdb

import (
"bytes"
"errors"
"fmt"
"io"
"os"
"sort"
"time"

"github.com/boltdb/bolt"
Expand All @@ -16,7 +18,7 @@ var (
)

// DefaultEngine is the default engine used by the shard when initializing.
const DefaultEngine = "b1"
const DefaultEngine = "bz1"

// Engine represents a swappable storage engine for the shard.
type Engine interface {
Expand Down Expand Up @@ -52,7 +54,7 @@ func RegisterEngine(name string, fn NewEngineFunc) {
func NewEngine(path string, options EngineOptions) (Engine, error) {
// Create a new engine
if _, err := os.Stat(path); os.IsNotExist(err) {
return newEngineFuncs[DefaultEngine](path, options), nil
return newEngineFuncs[options.EngineVersion](path, options), nil
}

// Only bolt-based backends are currently supported so open it and check the format.
Expand Down Expand Up @@ -96,17 +98,22 @@ func NewEngine(path string, options EngineOptions) (Engine, error) {

// EngineOptions represents the options used to initialize the engine.
type EngineOptions struct {
EngineVersion string
MaxWALSize int
WALFlushInterval time.Duration
WALPartitionFlushDelay time.Duration

Config Config
}

// NewEngineOptions returns the default options.
func NewEngineOptions() EngineOptions {
return EngineOptions{
EngineVersion: DefaultEngine,
MaxWALSize: DefaultMaxWALSize,
WALFlushInterval: DefaultWALFlushInterval,
WALPartitionFlushDelay: DefaultWALPartitionFlushDelay,
Config: NewConfig(),
}
}

Expand All @@ -125,3 +132,29 @@ type Cursor interface {
Seek(seek []byte) (key, value []byte)
Next() (key, value []byte)
}

// DedupeEntries returns slices with unique keys (the first 8 bytes).
func DedupeEntries(a [][]byte) [][]byte {
// Convert to a map where the last slice is used.
m := make(map[string][]byte)
for _, b := range a {
m[string(b[0:8])] = b
}

// Convert map back to a slice of byte slices.
other := make([][]byte, 0, len(m))
for _, v := range m {
other = append(other, v)
}

// Sort entries.
sort.Sort(ByteSlices(other))

return other
}

type ByteSlices [][]byte

func (a ByteSlices) Len() int { return len(a) }
func (a ByteSlices) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a ByteSlices) Less(i, j int) bool { return bytes.Compare(a[i], a[j]) == -1 }
2 changes: 1 addition & 1 deletion tsdb/engine/b1/b1.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (e *Engine) LoadMetadataIndex(index *tsdb.DatabaseIndex, measurementFields
meta = tx.Bucket([]byte("series"))
c = meta.Cursor()
for k, v := c.First(); k != nil; k, v = c.Next() {
series := &tsdb.Series{}
series := tsdb.NewSeries("", nil)
if err := series.UnmarshalBinary(v); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion tsdb/engine/b1/b1_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestEngine_WritePoints(t *testing.T) {
mf := &tsdb.MeasurementFields{Fields: make(map[string]*tsdb.Field)}
mf.CreateFieldIfNotExists("value", influxql.Float)
seriesToCreate := []*tsdb.SeriesCreate{
{Series: &tsdb.Series{Key: string(tsdb.MakeKey([]byte("temperature"), nil))}},
{Series: tsdb.NewSeries(string(tsdb.MakeKey([]byte("temperature"), nil)), nil)},
}

// Parse point.
Expand Down
Loading