Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add TSM 1.x storage options as flags #19506

Merged
merged 6 commits into from
Sep 4, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ need to update any InfluxDB CLI config profiles with the new port number.
1. [19390](https://github.com/influxdata/influxdb/pull/19390): Record last success and failure run times in the Task
1. [19402](https://github.com/influxdata/influxdb/pull/19402): Inject Task's LatestSuccess Timestamp In Flux Extern
1. [19433](https://github.com/influxdata/influxdb/pull/19433): Add option to dump raw query results in CLI
1. [19506](https://github.com/influxdata/influxdb/pull/19506): Add TSM 1.x storage options as flags

### Bug Fixes

Expand Down
82 changes: 65 additions & 17 deletions cmd/influxd/launcher/launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ import (
jaegerconfig "github.com/uber/jaeger-client-go/config"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/time/rate"
)

const (
Expand Down Expand Up @@ -368,17 +367,74 @@ func launcherOpts(l *Launcher) []cli.Opt {
Default: 10,
Desc: "the number of queries that are allowed to be awaiting execution before new queries are rejected",
},
{
DestP: &l.pageFaultRate,
Flag: "page-fault-rate",
Default: 0,
Desc: "the number of page faults allowed per second in the storage engine",
},
{
DestP: &l.featureFlags,
Flag: "feature-flags",
Desc: "feature flag overrides",
},

// storage configuration
{
DestP: &l.StorageConfig.Data.WALFsyncDelay,
Flag: "storage-wal-fsync-delay",
Desc: "The amount of time that a write will wait before fsyncing. A duration greater than 0 can be used to batch up multiple fsync calls. This is useful for slower disks or when WAL write contention is seen.",
},
{
DestP: &l.StorageConfig.Data.ValidateKeys,
Flag: "storage-validate-keys",
Desc: "Validates incoming writes to ensure keys only have valid unicode characters.",
},
{
DestP: &l.StorageConfig.Data.CacheMaxMemorySize,
Flag: "storage-cache-max-memory-size",
Desc: "The maximum size a shard's cache can reach before it starts rejecting writes.",
},
{
DestP: &l.StorageConfig.Data.CacheSnapshotMemorySize,
Flag: "storage-cache-snapshot-memory-size",
Desc: "The size at which the engine will snapshot the cache and write it to a TSM file, freeing up memory.",
},
{
DestP: &l.StorageConfig.Data.CacheSnapshotWriteColdDuration,
Flag: "storage-cache-snapshot-write-cold-duration",
Desc: "The length of time at which the engine will snapshot the cache and write it to a new TSM file if the shard hasn't received writes or deletes.",
},
{
DestP: &l.StorageConfig.Data.CompactFullWriteColdDuration,
Flag: "storage-compact-full-write-cold-duration",
Desc: "The duration at which the engine will compact all TSM files in a shard if it hasn't received a write or delete.",
},
{
DestP: &l.StorageConfig.Data.CompactThroughputBurst,
Flag: "storage-compact-throughput-burst",
Desc: "The rate limit in bytes per second that we will allow TSM compactions to write to disk.",
},
// limits
{
DestP: &l.StorageConfig.Data.MaxConcurrentCompactions,
Flag: "storage-max-concurrent-compactions",
Desc: "The maximum number of concurrent full and level compactions that can run at one time. A value of 0 results in 50% of runtime.GOMAXPROCS(0) used at runtime. Any number greater than 0 limits compactions to that value. This setting does not apply to cache snapshotting.",
},
{
DestP: &l.StorageConfig.Data.MaxIndexLogFileSize,
Flag: "storage-max-index-log-file-size",
Desc: "The threshold, in bytes, when an index write-ahead log file will compact into an index file. Lower sizes will cause log files to be compacted more quickly and result in lower heap usage at the expense of write throughput.",
},
{
DestP: &l.StorageConfig.Data.SeriesIDSetCacheSize,
Flag: "storage-series-id-set-cache-size",
Desc: "The size of the internal cache used in the TSI index to store previously calculated series results.",
},
{
DestP: &l.StorageConfig.Data.SeriesFileMaxConcurrentSnapshotCompactions,
Flag: "storage-series-file-max-concurrent-snapshot-compactions",
Desc: "The maximum number of concurrent snapshot compactions that can be running at one time across all series partitions in a database.",
},
{
DestP: &l.StorageConfig.Data.TSMWillNeed,
Flag: "storage-tsm-use-madv-willneed",
Desc: "Controls whether we hint to the kernel that we intend to page in mmap'd sections of TSM files.",
},
}
}

Expand Down Expand Up @@ -417,7 +473,8 @@ type Launcher struct {
boltClient *bolt.Client
kvStore kv.SchemaStore
kvService *kv.Service
//TODO fix

// storage engine
engine Engine
StorageConfig storage.Config

Expand Down Expand Up @@ -446,8 +503,6 @@ type Launcher struct {
Stdout io.Writer
Stderr io.Writer
apibackend *http.APIBackend

pageFaultRate int
}

type stoppingScheduler interface {
Expand Down Expand Up @@ -717,13 +772,6 @@ func (m *Launcher) run(ctx context.Context) (err error) {
return err
}

// Enable storage layer page fault limiting if rate set above zero.
var pageFaultLimiter *rate.Limiter
if m.pageFaultRate > 0 {
pageFaultLimiter = rate.NewLimiter(rate.Limit(m.pageFaultRate), 1)
}
_ = pageFaultLimiter

metaClient := meta.NewClient(meta.NewConfig(), m.kvStore)
if err := metaClient.Open(); err != nil {
m.log.Error("Failed to open meta client", zap.Error(err))
Expand Down
5 changes: 1 addition & 4 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ require (
github.com/dgrijalva/jwt-go v3.2.0+incompatible
github.com/dgryski/go-bitstream v0.0.0-20180413035011-3522498ce2c8
github.com/docker/docker v1.13.1 // indirect
github.com/dustin/go-humanize v1.0.0
github.com/editorconfig-checker/editorconfig-checker v0.0.0-20190819115812-1474bdeaf2a2
github.com/elazarl/go-bindata-assetfs v1.0.0
github.com/fatih/color v1.9.0
Expand Down Expand Up @@ -119,7 +120,3 @@ require (
labix.org/v2/mgo v0.0.0-20140701140051-000000000287 // indirect
launchpad.net/gocheck v0.0.0-20140225173054-000000000087 // indirect
)

replace github.com/Sirupsen/logrus => github.com/sirupsen/logrus v1.2.0

replace github.com/influxdata/platform => /dev/null
65 changes: 30 additions & 35 deletions toml/toml.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,30 @@ import (
"encoding"
"errors"
"fmt"
"math"
"os"
"os/user"
"reflect"
"strconv"
"strings"
"time"
"unicode"

"github.com/dustin/go-humanize"
"github.com/spf13/pflag"
)

// Duration is a TOML wrapper type for time.Duration.
type Duration time.Duration

var _ pflag.Value = (*Duration)(nil)

func (d *Duration) Set(s string) error {
return d.UnmarshalText([]byte(s))
}

func (d Duration) Type() string {
return "Duration"
}

// String returns the string representation of the duration.
func (d Duration) String() string {
return time.Duration(d).String()
Expand Down Expand Up @@ -51,48 +62,32 @@ func (d Duration) MarshalText() (text []byte, err error) {
// and "g" or "G" for gibibytes. If a size suffix isn't specified then bytes are assumed.
type Size uint64

var _ pflag.Value = (*Size)(nil)

func (s Size) String() string {
return humanize.IBytes(uint64(s))
}

func (s *Size) Set(d string) error {
return s.UnmarshalText([]byte(d))
}

func (s Size) Type() string {
return "Size"
}

// UnmarshalText parses a byte size from text.
func (s *Size) UnmarshalText(text []byte) error {
if len(text) == 0 {
return fmt.Errorf("size was empty")
}

// The multiplier defaults to 1 in case the size has
// no suffix (and is then just raw bytes)
mult := uint64(1)

// Preserve the original text for error messages
sizeText := text

// Parse unit of measure
suffix := text[len(sizeText)-1]
if !unicode.IsDigit(rune(suffix)) {
switch suffix {
case 'k', 'K':
mult = 1 << 10 // KiB
case 'm', 'M':
mult = 1 << 20 // MiB
case 'g', 'G':
mult = 1 << 30 // GiB
default:
return fmt.Errorf("unknown size suffix: %c (expected k, m, or g)", suffix)
}
sizeText = sizeText[:len(sizeText)-1]
}

// Parse numeric portion of value.
size, err := strconv.ParseUint(string(sizeText), 10, 64)
v, err := humanize.ParseBytes(string(text))
if err != nil {
return fmt.Errorf("invalid size: %s", string(text))
}

if math.MaxUint64/mult < size {
return fmt.Errorf("size would overflow the max size (%d) of a uint: %s", uint64(math.MaxUint64), string(text))
return err
}
*s = Size(v)

size *= mult

*s = Size(size)
return nil
}

Expand Down
38 changes: 19 additions & 19 deletions toml/toml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,41 +22,41 @@ func TestSize_UnmarshalText(t *testing.T) {
{"1", 1},
{"10", 10},
{"100", 100},
{"1k", 1 << 10},
{"10k", 10 << 10},
{"100k", 100 << 10},
{"1K", 1 << 10},
{"10K", 10 << 10},
{"100K", 100 << 10},
{"1m", 1 << 20},
{"10m", 10 << 20},
{"100m", 100 << 20},
{"1M", 1 << 20},
{"10M", 10 << 20},
{"100M", 100 << 20},
{"1g", 1 << 30},
{"1G", 1 << 30},
{fmt.Sprint(uint64(math.MaxUint64) - 1), math.MaxUint64 - 1},
{"1kib", 1 << 10},
{"10kib", 10 << 10},
{"100kib", 100 << 10},
{"1Kib", 1 << 10},
{"10Kib", 10 << 10},
{"100Kib", 100 << 10},
{"1mib", 1 << 20},
{"10mib", 10 << 20},
{"100mib", 100 << 20},
{"1Mib", 1 << 20},
{"10Mib", 10 << 20},
{"100Mib", 100 << 20},
{"1gib", 1 << 30},
{"1Gib", 1 << 30},
{"100Gib", 100 << 30},
{"1tib", 1 << 40},
} {
if err := s.UnmarshalText([]byte(test.str)); err != nil {
t.Fatalf("unexpected error: %s", err)
t.Errorf("unexpected error: %s", err)
}
if s != itoml.Size(test.want) {
t.Fatalf("wanted: %d got: %d", test.want, s)
t.Errorf("wanted: %d got: %d", test.want, s)
}
}

for _, str := range []string{
fmt.Sprintf("%dk", uint64(math.MaxUint64-1)),
"10000000000000000000g",
"abcdef",
"1KB",
"√m",
"a1",
"",
} {
if err := s.UnmarshalText([]byte(str)); err == nil {
t.Fatalf("input should have failed: %s", str)
t.Errorf("input should have failed: %s", str)
}
}
}
Expand Down
4 changes: 2 additions & 2 deletions tsdb/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ func TestConfig_HumanReadableSizes(t *testing.T) {
dir = "/var/lib/influxdb/data"
wal-dir = "/var/lib/influxdb/wal"
wal-fsync-delay = "10s"
cache-max-memory-size = "5g"
cache-snapshot-memory-size = "100m"
cache-max-memory-size = "5gib"
cache-snapshot-memory-size = "100mib"
`, &c); err != nil {
t.Fatal(err)
}
Expand Down