Skip to content
This repository has been archived by the owner on Dec 16, 2022. It is now read-only.

slack-vitess-9 2021.06.07.r14 vtctld branch #225

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go/cmd/vtadmin/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,9 @@ func main() {
rootCmd.Flags().Var(&clusterFileConfig, "cluster-config", "path to a yaml cluster configuration. see clusters.example.yaml") // (TODO:@amason) provide example config.
rootCmd.Flags().Var(&defaultClusterConfig, "cluster-defaults", "default options for all clusters")

rootCmd.Flags().AddGoFlag(flag.Lookup("tracer")) // defined in go/vt/trace
rootCmd.Flags().AddGoFlag(flag.Lookup("tracing-sampling-type")) // defined in go/vt/trace
rootCmd.Flags().AddGoFlag(flag.Lookup("tracing-sampling-rate")) // defined in go/vt/trace
rootCmd.Flags().BoolVar(&opts.EnableTracing, "grpc-tracing", false, "whether to enable tracing on the gRPC server")
rootCmd.Flags().BoolVar(&httpOpts.EnableTracing, "http-tracing", false, "whether to enable tracing on the HTTP server")
rootCmd.Flags().BoolVar(&httpOpts.DisableCompression, "http-no-compress", false, "whether to disable compression of HTTP API responses")
Expand Down
146 changes: 146 additions & 0 deletions go/flagutil/optional.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,146 @@
/*
Copyright 2021 The Vitess Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package flagutil

import (
"errors"
"flag"
"strconv"
)

// OptionalFlag augements the flag.Value interface with a method to determine
// if a flag was set explicitly on the comand-line.
//
// Though not part of the interface, because the return type would be different
// for each implementation, by convention, each implementation should define a
// Get() method to access the underlying value.
type OptionalFlag interface {
flag.Value
IsSet() bool
}

var (
_ OptionalFlag = (*OptionalFloat64)(nil)
_ OptionalFlag = (*OptionalString)(nil)
)

// OptionalFloat64 implements OptionalFlag for float64 values.
type OptionalFloat64 struct {
val float64
set bool
}

// NewOptionalFloat64 returns an OptionalFloat64 with the specified value as its
// starting value.
func NewOptionalFloat64(val float64) *OptionalFloat64 {
return &OptionalFloat64{
val: val,
set: false,
}
}

// Set is part of the flag.Value interface.
func (f *OptionalFloat64) Set(arg string) error {
v, err := strconv.ParseFloat(arg, 64)
if err != nil {
return numError(err)
}

f.val = v
f.set = true

return nil
}

// String is part of the flag.Value interface.
func (f *OptionalFloat64) String() string {
return strconv.FormatFloat(f.val, 'g', -1, 64)
}

// Get returns the underlying float64 value of this flag. If the flag was not
// explicitly set, this will be the initial value passed to the constructor.
func (f *OptionalFloat64) Get() float64 {
return f.val
}

// IsSet is part of the OptionalFlag interface.
func (f *OptionalFloat64) IsSet() bool {
return f.set
}

// OptionalString implements OptionalFlag for string values.
type OptionalString struct {
val string
set bool
}

// NewOptionalString returns an OptionalString with the specified value as its
// starting value.
func NewOptionalString(val string) *OptionalString {
return &OptionalString{
val: val,
set: false,
}
}

// Set is part of the flag.Value interface.
func (f *OptionalString) Set(arg string) error {
f.val = arg
f.set = true
return nil
}

// String is part of the flag.Value interface.
func (f *OptionalString) String() string {
return f.val
}

// Get returns the underlying string value of this flag. If the flag was not
// explicitly set, this will be the initial value passed to the constructor.
func (f *OptionalString) Get() string {
return f.val
}

// IsSet is part of the OptionalFlag interface.
func (f *OptionalString) IsSet() bool {
return f.set
}

// lifted directly from package flag to make the behavior of numeric parsing
// consistent with the standard library for our custom optional types.
var (
errParse = errors.New("parse error")
errRange = errors.New("value out of range")
)

// lifted directly from package flag to make the behavior of numeric parsing
// consistent with the standard library for our custom optional types.
func numError(err error) error {
ne, ok := err.(*strconv.NumError)
if !ok {
return err
}

switch ne.Err {
case strconv.ErrSyntax:
return errParse
case strconv.ErrRange:
return errRange
default:
return err
}
}
2 changes: 1 addition & 1 deletion go/trace/plugin_datadog.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func newDatadogTracer(serviceName string) (tracingService, io.Closer, error) {
ddtracer.WithAgentAddr(*dataDogHost+":"+*dataDogPort),
ddtracer.WithServiceName(serviceName),
ddtracer.WithDebugMode(true),
ddtracer.WithSampler(ddtracer.NewRateSampler(*samplingRate)),
ddtracer.WithSampler(ddtracer.NewRateSampler(samplingRate.Get())),
)

opentracing.SetGlobalTracer(t)
Expand Down
32 changes: 27 additions & 5 deletions go/trace/plugin_jaeger.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,13 @@ package trace
import (
"flag"
"io"
"os"

"github.com/opentracing/opentracing-go"
"github.com/uber/jaeger-client-go"
"github.com/uber/jaeger-client-go/config"

"vitess.io/vitess/go/flagutil"
"vitess.io/vitess/go/vt/log"
)

Expand All @@ -35,9 +37,15 @@ included but nothing Jaeger specific.

var (
agentHost = flag.String("jaeger-agent-host", "", "host and port to send spans to. if empty, no tracing will be done")
samplingRate = flag.Float64("tracing-sampling-rate", 0.1, "sampling rate for the probabilistic jaeger sampler")
samplingType = flagutil.NewOptionalString("const")
samplingRate = flagutil.NewOptionalFloat64(0.1)
)

func init() {
flag.Var(samplingType, "tracing-sampling-type", "sampling strategy to use for jaeger. possible values are 'const', 'probabilistic', 'rateLimiting', or 'remote'")
flag.Var(samplingRate, "tracing-sampling-rate", "sampling rate for the probabilistic jaeger sampler")
}

// newJagerTracerFromEnv will instantiate a tracingService implemented by Jaeger,
// taking configuration from environment variables. Available properties are:
// JAEGER_SERVICE_NAME -- If this is set, the service name used in code will be ignored and this value used instead
Expand Down Expand Up @@ -70,11 +78,25 @@ func newJagerTracerFromEnv(serviceName string) (tracingService, io.Closer, error
cfg.Reporter.LocalAgentHostPort = *agentHost
}
log.Infof("Tracing to: %v as %v", cfg.Reporter.LocalAgentHostPort, cfg.ServiceName)
cfg.Sampler = &config.SamplerConfig{
Type: jaeger.SamplerTypeConst,
Param: *samplingRate,

if os.Getenv("JAEGER_SAMPLER_PARAM") == "" {
// If the environment variable was not set, we take the flag regardless
// of whether it was explicitly set on the command line.
cfg.Sampler.Param = samplingRate.Get()
} else if samplingRate.IsSet() {
// If the environment variable was set, but the user also explicitly
// passed the command line flag, the flag takes precedence.
cfg.Sampler.Param = samplingRate.Get()
}
log.Infof("Tracing sampling rate: %v", *samplingRate)

if samplingType.IsSet() {
cfg.Sampler.Type = samplingType.Get()
} else if cfg.Sampler.Type == "" {
log.Infof("-tracing-sampler-type was not set, and JAEGER_SAMPLER_TYPE was not set, defaulting to const sampler")
cfg.Sampler.Type = jaeger.SamplerTypeConst
}

log.Infof("Tracing sampler type %v (param: %v)", cfg.Sampler.Type, cfg.Sampler.Param)

tracer, closer, err := cfg.NewTracer()

Expand Down
13 changes: 12 additions & 1 deletion go/vt/grpcclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ limitations under the License.
package grpcclient

import (
"context"
"flag"
"time"

Expand Down Expand Up @@ -58,6 +59,16 @@ func RegisterGRPCDialOptions(grpcDialOptionsFunc func(opts []grpc.DialOption) ([
// failFast is a non-optional parameter because callers are required to specify
// what that should be.
func Dial(target string, failFast FailFast, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
return DialContext(context.Background(), target, failFast, opts...)
}

// DialContext creates a grpc connection to the given target. Setup steps are
// covered by the context deadline, and, if WithBlock is specified in the dial
// options, connection establishment steps are covered by the context as well.
//
// failFast is a non-optional parameter because callers are required to specify
// what that should be.
func DialContext(ctx context.Context, target string, failFast FailFast, opts ...grpc.DialOption) (*grpc.ClientConn, error) {
grpccommon.EnableTracingOpt()
newopts := []grpc.DialOption{
grpc.WithDefaultCallOptions(
Expand Down Expand Up @@ -98,7 +109,7 @@ func Dial(target string, failFast FailFast, opts ...grpc.DialOption) (*grpc.Clie

newopts = append(newopts, interceptors()...)

return grpc.Dial(target, newopts...)
return grpc.DialContext(ctx, target, newopts...)
}

func interceptors() []grpc.DialOption {
Expand Down
72 changes: 72 additions & 0 deletions go/vt/key/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"bytes"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
"math"
"regexp"
Expand Down Expand Up @@ -364,3 +365,74 @@ var krRegexp = regexp.MustCompile(`^[0-9a-fA-F]*-[0-9a-fA-F]*$`)
func IsKeyRange(kr string) bool {
return krRegexp.MatchString(kr)
}

// GenerateShardRanges returns shard ranges assuming a keyspace with N shards.
func GenerateShardRanges(shards int) ([]string, error) {
var format string
var maxShards int

switch {
case shards <= 0:
return nil, errors.New("shards must be greater than zero")
case shards <= 256:
format = "%02x"
maxShards = 256
case shards <= 65536:
format = "%04x"
maxShards = 65536
default:
return nil, errors.New("this function does not support more than 65336 shards in a single keyspace")
}

rangeFormatter := func(start, end int) string {
var (
startKid string
endKid string
)

if start != 0 {
startKid = fmt.Sprintf(format, start)
}

if end != maxShards {
endKid = fmt.Sprintf(format, end)
}

return fmt.Sprintf("%s-%s", startKid, endKid)
}

start := 0
end := 0

// If shards does not divide evenly into maxShards, then there is some lossiness,
// where each shard is smaller than it should technically be (if, for example, size == 25.6).
// If we choose to keep everything in ints, then we have two choices:
// - Have every shard in #numshards be a uniform size, tack on an additional shard
// at the end of the range to account for the loss. This is bad because if you ask for
// 7 shards, you'll actually get 7 uniform shards with 1 small shard, for 8 total shards.
// It's also bad because one shard will have much different data distribution than the rest.
// - Expand the final shard to include whatever is left in the keyrange. This will give the
// correct number of shards, which is good, but depending on how lossy each individual shard is,
// you could end with that final shard being significantly larger than the rest of the shards,
// so this doesn't solve the data distribution problem.
//
// By tracking the "real" end (both in the real number sense, and in the truthfulness of the value sense),
// we can re-truncate the integer end on each iteration, which spreads the lossiness more
// evenly across the shards.
//
// This implementation has no impact on shard numbers that are powers of 2, even at large numbers,
// which you can see in the tests.
size := float64(maxShards) / float64(shards)
realEnd := float64(0)
shardRanges := make([]string, 0, shards)

for i := 1; i <= shards; i++ {
realEnd = float64(i) * size

end = int(realEnd)
shardRanges = append(shardRanges, rangeFormatter(start, end))
start = end
}

return shardRanges, nil
}
Loading