Skip to content

Commit

Permalink
Add WAL configuration options
Browse files Browse the repository at this point in the history
  • Loading branch information
pauldix committed Aug 18, 2015
1 parent 30bcd3e commit 9df3b7d
Show file tree
Hide file tree
Showing 14 changed files with 142 additions and 43 deletions.
5 changes: 4 additions & 1 deletion cmd/influxd/run/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ type Server struct {
// NewServer returns a new instance of Server built from a config.
func NewServer(c *Config, version string) (*Server, error) {
// Construct base meta store and data store.
tsdbStore := tsdb.NewStore(c.Data.Dir)
tsdbStore.EngineOptions.Config = c.Data

s := &Server{
version: version,
err: make(chan error),
Expand All @@ -77,7 +80,7 @@ func NewServer(c *Config, version string) (*Server, error) {
BindAddress: c.Meta.BindAddress,

MetaStore: meta.NewStore(c.Meta),
TSDBStore: tsdb.NewStore(c.Data.Dir),
TSDBStore: tsdbStore,

reportingDisabled: c.ReportingDisabled,
}
Expand Down
2 changes: 2 additions & 0 deletions cmd/influxd/run/server_helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ func NewServer(c *run.Config) *Server {
Server: srv,
Config: c,
}
s.TSDBStore.EngineOptions.Config = c.Data
configureLogging(&s)
return &s
}
Expand Down Expand Up @@ -155,6 +156,7 @@ func NewConfig() *run.Config {
c.Meta.CommitTimeout = toml.Duration(5 * time.Millisecond)

c.Data.Dir = MustTempFile()
c.Data.WALDir = MustTempFile()

c.HintedHandoff.Dir = MustTempFile()

Expand Down
27 changes: 27 additions & 0 deletions etc/config.sample.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,37 @@ reporting-disabled = false

[data]
dir = "/var/opt/influxdb/data"

# The following WAL settings are for the b1 storage engine used in 0.9.2. They won't
# apply to any new shards created after upgrading to a version > 0.9.3.
max-wal-size = 104857600 # Maximum size the WAL can reach before a flush. Defaults to 100MB.
wal-flush-interval = "10m" # Maximum time data can sit in WAL before a flush.
wal-partition-flush-delay = "2s" # The delay time between each WAL partition being flushed.

# These are the WAL settings for the storage engine >= 0.9.3
wal-dir = "/var/opt/influxdb/wal"
wal-enable-logging = true

# When a series in the WAL in-memory cache reaches this size in bytes it is marked as ready to
# flush to the index
# wal-ready-series-size = 25600

# Flush and compact a partition once this ratio of series are over the ready size
# wal-compaction-threshold = 0.6

# Force a flush and compaction if any series in a partition gets above this size in bytes
# wal-max-series-size = 2097152

# Force a flush of all series and full compaction if there have been no writes in this
# amount of time. This is useful for ensuring that shards that are cold for writes don't
# keep a bunch of data cached in memory and in the WAL.
# wal-flush-cold-interval = "10m"

# Force a partition to flush its largest series if it reaches this approximate size in
# bytes. Remember there are 5 partitions so you'll need at least 5x this amount of memory.
# The more memory you have, the bigger this can be.
# wal-partition-size-threshold = 20971520

###
### [cluster]
###
Expand Down
44 changes: 43 additions & 1 deletion tsdb/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,19 +16,61 @@ const (

// DefaultWALPartitionFlushDelay is the sleep time between WAL partition flushes.
DefaultWALPartitionFlushDelay = 2 * time.Second

// tsdb/engine/wal configuration options

// DefaultReadySeriesSize of 32KB specifies when a series is eligible to be flushed
DefaultReadySeriesSize = 30 * 1024

// DefaultCompactionThreshold flush and compact a partition once this ratio of keys are over the flush size
DefaultCompactionThreshold = 0.5

// DefaultMaxSeriesSize specifies the size at which a series will be forced to flush
DefaultMaxSeriesSize = 1024 * 1024

// DefaultFlushColdInterval specifies how long after a partition has been cold
// for writes that a full flush and compaction are forced
DefaultFlushColdInterval = 20 * time.Second

// DefaultParititionSizeThreshold specifies when a partition gets to this size in
// memory, we should slow down writes until it gets a chance to compact.
// This will force clients to get backpressure if they're writing too fast. We need
// this because the WAL can take writes much faster than the index. So eventually
// we'll need to create backpressure, otherwise we'll fill up the memory and die.
// This number multiplied by the parition count is roughly the max possible memory
// size for the in-memory WAL cache.
DefaultPartitionSizeThreshold = 20 * 1024 * 1024 // 20MB
)

type Config struct {
Dir string `toml:"dir"`
Dir string `toml:"dir"`

// WAL config options for b1 (introduced in 0.9.2)
MaxWALSize int `toml:"max-wal-size"`
WALFlushInterval toml.Duration `toml:"wal-flush-interval"`
WALPartitionFlushDelay toml.Duration `toml:"wal-partition-flush-delay"`

// WAL configuration options for bz1 (introduced in 0.9.3)
WALDir string `toml:"wal-dir"`
WALEnableLogging bool `toml:"wal-enable-logging"`
WALReadySeriesSize int `toml:"wal-ready-series-size"`
WALCompactionThreshold float64 `toml:"wal-compaction-threshold"`
WALMaxSeriesSize int `toml:"wal-max-series-size"`
WALFlushColdInterval toml.Duration `toml:"wal-flush-cold-interval"`
WALPartitionSizeThreshold uint64 `toml:"wal-partition-size-threshold"`
}

func NewConfig() Config {
return Config{
MaxWALSize: DefaultMaxWALSize,
WALFlushInterval: toml.Duration(DefaultWALFlushInterval),
WALPartitionFlushDelay: toml.Duration(DefaultWALPartitionFlushDelay),

WALEnableLogging: true,
WALReadySeriesSize: DefaultReadySeriesSize,
WALCompactionThreshold: DefaultCompactionThreshold,
WALMaxSeriesSize: DefaultMaxSeriesSize,
WALFlushColdInterval: toml.Duration(DefaultFlushColdInterval),
WALPartitionSizeThreshold: DefaultPartitionSizeThreshold,
}
}
3 changes: 3 additions & 0 deletions tsdb/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ type EngineOptions struct {
MaxWALSize int
WALFlushInterval time.Duration
WALPartitionFlushDelay time.Duration

Config Config
}

// NewEngineOptions returns the default options.
Expand All @@ -111,6 +113,7 @@ func NewEngineOptions() EngineOptions {
MaxWALSize: DefaultMaxWALSize,
WALFlushInterval: DefaultWALFlushInterval,
WALPartitionFlushDelay: DefaultWALPartitionFlushDelay,
Config: NewConfig(),
}
}

Expand Down
13 changes: 8 additions & 5 deletions tsdb/engine/bz1/bz1.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@ var (
const (
// Format is the file format name of this engine.
Format = "bz1"

// WALDir is the suffixe that is put on the path for
// where the WAL files should be kept for a given shard.
WALDir = "_wal"
)

func init() {
Expand Down Expand Up @@ -72,7 +68,14 @@ type WAL interface {
// NewEngine returns a new instance of Engine.
func NewEngine(path string, opt tsdb.EngineOptions) tsdb.Engine {
// create the writer with a directory of the same name as the shard, but with the wal extension
w := wal.NewLog(filepath.Join(filepath.Dir(path), filepath.Base(path)+WALDir))
w := wal.NewLog(filepath.Join(opt.Config.WALDir, filepath.Base(path)))

w.ReadySeriesSize = opt.Config.WALReadySeriesSize
w.FlushColdInterval = time.Duration(opt.Config.WALFlushColdInterval)
w.MaxSeriesSize = opt.Config.WALMaxSeriesSize
w.CompactionThreshold = opt.Config.WALCompactionThreshold
w.PartitionSizeThreshold = opt.Config.WALPartitionSizeThreshold
w.ReadySeriesSize = opt.Config.WALReadySeriesSize

e := &Engine{
path: path,
Expand Down
40 changes: 12 additions & 28 deletions tsdb/engine/wal/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,28 +45,6 @@ const (
// DefaultSegmentSize of 2MB is the size at which segment files will be rolled over
DefaultSegmentSize = 2 * 1024 * 1024

// DefaultReadySeriesSize of 32KB specifies when a series is eligible to be flushed
DefaultReadySeriesSize = 30 * 1024

// DefaultCompactionThreshold flush and compact a partition once this ratio of keys are over the flush size
DefaultCompactionThreshold = 0.5

// DefaultMaxSeriesSize specifies the size at which a series will be forced to flush
DefaultMaxSeriesSize = 1024 * 1024

// DefaultFlushColdInterval specifies how long after a partition has been cold
// for writes that a full flush and compaction are forced
DefaultFlushColdInterval = 20 * time.Second

// DefaultParititionSizeThreshold specifies when a partition gets to this size in
// memory, we should slow down writes until it gets a chance to compact.
// This will force clients to get backpressure if they're writing too fast. We need
// this because the WAL can take writes much faster than the index. So eventually
// we'll need to create backpressure, otherwise we'll fill up the memory and die.
// This number multiplied by the parition count is roughly the max possible memory
// size for the in-memory WAL cache.
DefaultPartitionSizeThreshold = 20 * 1024 * 1024 // 20MB

// PartitionCount is the number of partitions in the WAL
PartitionCount = 5

Expand Down Expand Up @@ -171,6 +149,9 @@ type Log struct {
// Index is the database that series data gets flushed to once it gets compacted
// out of the WAL.
Index IndexWriter

// EnableLogging specifies if detailed logs should be output
EnableLogging bool
}

// IndexWriter is an interface for the indexed database the WAL flushes data to
Expand All @@ -188,12 +169,12 @@ func NewLog(path string) *Log {

// these options should be overriden by any options in the config
LogOutput: os.Stderr,
FlushColdInterval: DefaultFlushColdInterval,
FlushColdInterval: tsdb.DefaultFlushColdInterval,
SegmentSize: DefaultSegmentSize,
MaxSeriesSize: DefaultMaxSeriesSize,
CompactionThreshold: DefaultCompactionThreshold,
PartitionSizeThreshold: DefaultPartitionSizeThreshold,
ReadySeriesSize: DefaultReadySeriesSize,
MaxSeriesSize: tsdb.DefaultMaxSeriesSize,
CompactionThreshold: tsdb.DefaultCompactionThreshold,
PartitionSizeThreshold: tsdb.DefaultPartitionSizeThreshold,
ReadySeriesSize: tsdb.DefaultReadySeriesSize,
partitionCount: PartitionCount,
flushCheckInterval: defaultFlushCheckInterval,
}
Expand All @@ -217,6 +198,7 @@ func (l *Log) Open() error {
if err != nil {
return err
}
p.enableLogging = l.EnableLogging
l.partitions[uint8(i)] = p
}
if err := l.openPartitionFiles(); err != nil {
Expand Down Expand Up @@ -671,8 +653,8 @@ func (l *Log) partition(key []byte) *Partition {
if p == nil {
if p, err := NewPartition(id, l.path, l.SegmentSize, l.PartitionSizeThreshold, l.ReadySeriesSize, l.FlushColdInterval, l.Index); err != nil {
panic(err)

} else {
p.enableLogging = l.EnableLogging
l.partitions[id] = p
}
}
Expand Down Expand Up @@ -715,6 +697,8 @@ type Partition struct {
// be flushed because it has been idle for writes.
flushColdInterval time.Duration
lastWriteTime time.Time

enableLogging bool
}

func NewPartition(id uint8, path string, segmentSize int64, sizeThreshold uint64, readySeriesSize int, flushColdInterval time.Duration, index IndexWriter) (*Partition, error) {
Expand Down
6 changes: 3 additions & 3 deletions tsdb/engine/wal/wal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
buf.WriteString(fmt.Sprintf("cpu,host=A,region=useast3 value=%.3f %d\n", rand.Float64(), i))

// ensure that as a whole its not ready for flushing yet
if log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold) != noFlush {
if log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold) != noFlush {
t.Fatal("expected partition 1 to return false from shouldFlush")
}
}
Expand All @@ -354,7 +354,7 @@ func TestWAL_CompactAfterPercentageThreshold(t *testing.T) {
}

// ensure it is marked as should flush because of the threshold
if log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold) != thresholdFlush {
if log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold) != thresholdFlush {
t.Fatal("expected partition 1 to return true from shouldFlush")
}

Expand Down Expand Up @@ -451,7 +451,7 @@ func TestWAL_CompactAfterTimeWithoutWrite(t *testing.T) {
time.Sleep(700 * time.Millisecond)

// ensure that as a whole its not ready for flushing yet
if f := log.partitions[1].shouldFlush(DefaultMaxSeriesSize, DefaultCompactionThreshold); f != noFlush {
if f := log.partitions[1].shouldFlush(tsdb.DefaultMaxSeriesSize, tsdb.DefaultCompactionThreshold); f != noFlush {
t.Fatalf("expected partition 1 to return noFlush from shouldFlush %v", f)
}

Expand Down
3 changes: 3 additions & 0 deletions tsdb/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"io/ioutil"
"math"
"os"
"path/filepath"
"testing"
"time"

Expand Down Expand Up @@ -947,6 +948,8 @@ func testStore() *tsdb.Store {
path, _ := ioutil.TempDir("", "")

store := tsdb.NewStore(path)

store.EngineOptions.Config.WALDir = filepath.Join(path, "wal")
err := store.Open()
if err != nil {
panic(err)
Expand Down
5 changes: 4 additions & 1 deletion tsdb/mapper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io/ioutil"
"os"
"path"
"path/filepath"
"reflect"
"strings"
"testing"
Expand Down Expand Up @@ -494,7 +495,9 @@ func TestShardMapper_LocalMapperTagSets(t *testing.T) {
func mustCreateShard(dir string) *tsdb.Shard {
tmpShard := path.Join(dir, "shard")
index := tsdb.NewDatabaseIndex()
sh := tsdb.NewShard(1, index, tmpShard, tsdb.NewEngineOptions())
opts := tsdb.NewEngineOptions()
opts.Config.WALDir = filepath.Join(dir, "wal")
sh := tsdb.NewShard(1, index, tmpShard, opts)
if err := sh.Open(); err != nil {
panic(fmt.Sprintf("error opening shard: %s", err.Error()))
}
Expand Down
12 changes: 12 additions & 0 deletions tsdb/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ func TestWritePointsAndExecuteQuery(t *testing.T) {
}

store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
if err := store.Open(); err != nil {
t.Fatalf(err.Error())
}
Expand Down Expand Up @@ -84,7 +86,9 @@ func TestWritePointsAndExecuteQuery_Update(t *testing.T) {

// Restart store.
store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
if err := store.Open(); err != nil {
t.Fatalf(err.Error())
}
Expand Down Expand Up @@ -145,7 +149,9 @@ func TestDropSeriesStatement(t *testing.T) {
}

store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
store.Open()
executor.Store = store

Expand Down Expand Up @@ -215,7 +221,9 @@ func TestDropMeasurementStatement(t *testing.T) {

validateDrop()
store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
store.Open()
executor.Store = store
validateDrop()
Expand Down Expand Up @@ -279,7 +287,9 @@ func TestDropDatabase(t *testing.T) {
}

store.Close()
conf := store.EngineOptions.Config
store = tsdb.NewStore(store.Path())
store.EngineOptions.Config = conf
store.Open()
executor.Store = store
executor.ShardMapper = &testShardMapper{store: store}
Expand Down Expand Up @@ -344,6 +354,8 @@ func testStoreAndExecutor() (*tsdb.Store, *tsdb.QueryExecutor) {
path, _ := ioutil.TempDir("", "")

store := tsdb.NewStore(path)
store.EngineOptions.Config.WALDir = filepath.Join(path, "wal")

err := store.Open()
if err != nil {
panic(err)
Expand Down
Loading

0 comments on commit 9df3b7d

Please sign in to comment.