Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

boost: load query filters configs and check for boost #12

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ var (

mysqlPort = flag.Int("mysql_port", 3306, "mysql port")

configPath = flag.String("boost_configs_path", "./boost_query_configs.yaml", "path to the boost query configurations file")

ts *topo.Server
resilientServer *srvtopo.ResilientServer
)
Expand Down Expand Up @@ -214,7 +216,7 @@ func main() {
vtgate.QueryLogHandler = "/debug/vtgate/querylog"
vtgate.QueryLogzHandler = "/debug/vtgate/querylogz"
vtgate.QueryzHandler = "/debug/vtgate/queryz"
vtg := vtgate.Init(context.Background(), resilientServer, tpb.Cells[0], tabletTypesToWait)
vtg := vtgate.Init(context.Background(), resilientServer, tpb.Cells[0], tabletTypesToWait, configPath)

// vtctld configuration and init
vtctld.InitVtctld(ts)
Expand Down
4 changes: 3 additions & 1 deletion go/cmd/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ var (
tabletTypesToWait = flag.String("tablet_types_to_wait", "", "wait till connected for specified tablet types during Gateway initialization")
)

var configPath = flag.String("boost_configs_path", "./boost_query_configs.yaml", "path to the boost query configurations file")

var resilientServer *srvtopo.ResilientServer
var legacyHealthCheck discovery.LegacyHealthCheck

Expand Down Expand Up @@ -153,7 +155,7 @@ func main() {
vtg = vtgate.LegacyInit(context.Background(), legacyHealthCheck, resilientServer, *cell, *vtgate.RetryCount, tabletTypes)
} else {
// use new Init otherwise
vtg = vtgate.Init(context.Background(), resilientServer, *cell, tabletTypes)
vtg = vtgate.Init(context.Background(), resilientServer, *cell, tabletTypes, configPath)
}

servenv.OnRun(func() {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts *Options) error {

streamSize := 10
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig, schemaTracker)
vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig, schemaTracker, nil, nil)

queryLogBufferSize := 10
vtgate.QueryLogger = streamlog.New("VTGate", queryLogBufferSize)
Expand Down
21 changes: 19 additions & 2 deletions go/vt/vtgate/boost/boost.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,23 @@ package boost
type Columns map[string]string

type PlanConfig struct {
IsBoosted bool
BoostColumns Columns
IsBoosted bool
Columns Columns
Table string
}

func NonBoostedPlanConfig() *PlanConfig {
return &PlanConfig{
IsBoosted: false,
Columns: Columns{},
}
}

type QueryFilterConfig struct {
Columns []string `yaml:"columns"`
TableName string `yaml:"tableName"`
}

type QueryFilterConfigs struct {
BoostConfigs []QueryFilterConfig `yaml:"boostConfigs"`
}
43 changes: 43 additions & 0 deletions go/vt/vtgate/boost/load_configs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package boost

import (
"fmt"
"gopkg.in/yaml.v2"
"io/ioutil"
)

func Load(configPath *string) (*QueryFilterConfigs, error) {
data, err := ioutil.ReadFile(*configPath)
fmt.Printf("Reading Boost Configs from: %s\n", *configPath)
if err != nil {
fmt.Printf("Error reading file: %v\n", err)
return nil, err
}

var configs QueryFilterConfigs
err = yaml.Unmarshal(data, &configs)
if err != nil {
return nil, err
}

fmt.Printf("Parsed Configs: %+v\n", configs)

rocket := `
|
/ \
/ _ \
|.'''.|
|'._.'|
|BOOST|
,'| | |\.
/ | | | \
|,-'--|--'-.|
|| || ||
`

fmt.Println(rocket)

fmt.Println("Boost enabled.")

return &configs, nil
}
70 changes: 40 additions & 30 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"sync"
"time"
"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/vt/vtgate/boost"

"vitess.io/vitess/go/acl"
Expand Down Expand Up @@ -101,6 +102,9 @@ type Executor struct {

vm *VSchemaManager
schemaTracker SchemaInfo

queryFilterConfigs *boost.QueryFilterConfigs
boostCache *redis.Cache
}

var executorOnce sync.Once
Expand All @@ -119,18 +123,22 @@ func NewExecutor(
streamSize int,
cacheCfg *cache.Config,
schemaTracker SchemaInfo,
queryFilterConfigs *boost.QueryFilterConfigs,
boostCache *redis.Cache,
) *Executor {
e := &Executor{
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
queryFilterConfigs: queryFilterConfigs,
boostCache: boostCache,
}

vschemaacl.Init()
Expand Down Expand Up @@ -1202,7 +1210,7 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
}
ignoreMaxMemoryRows := sqlparser.IgnoreMaxMaxMemoryRowsDirective(stmt)
vcursor.SetIgnoreMaxMemoryRows(ignoreMaxMemoryRows)
var boostPlanConfig *boost.PlanConfig
var boostPlanConfig = boost.NonBoostedPlanConfig()

// Normalize if possible and retry.
if (e.normalize && sqlparser.CanNormalize(stmt)) || sqlparser.MustRewriteAST(stmt) {
Expand All @@ -1214,7 +1222,7 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
statement = result.AST
bindVarNeeds = result.BindVarNeeds
query = sqlparser.String(statement)
boostPlanConfig = configForBoost(result.Columns, "<TODO>")
boostPlanConfig = configForBoost(e.queryFilterConfigs, result.Columns, vcursor.vschema.UniqueTables)
}

if logStats != nil {
Expand Down Expand Up @@ -1242,30 +1250,32 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
return plan, nil
}

// TODO
func configForBoost(columns boost.Columns, table string) *boost.PlanConfig {
configColumns := map[string]string{
"user_id": "1337",
func configForBoost(configs *boost.QueryFilterConfigs, columns boost.Columns, inputMap map[string]*vindexes.Table) *boost.PlanConfig {
if configs == nil || columns == nil {
return boost.NonBoostedPlanConfig()
}

//todo compare sets for ordering
if !keysMatch(columns, configColumns) {
return &boost.PlanConfig{}
}
for tableName, _ := range inputMap {
for _, filterConfig := range configs.BoostConfigs {
if tableName == filterConfig.TableName {

return &boost.PlanConfig{
IsBoosted: true,
BoostColumns: columns,
if keysMatch(columns, filterConfig.Columns) {
return &boost.PlanConfig{
IsBoosted: true,
Columns: columns,
Table: tableName,
}
}
}
}
}
}

func keysMatch(map1, map2 map[string]string) bool {
if len(map1) != len(map2) {
return false
}
return boost.NonBoostedPlanConfig()
}

for k := range map1 {
if _, exists := map2[k]; !exists {
func keysMatch(columns map[string]string, keys []string) bool {
for _, k := range keys {
if _, exists := columns[k]; !exists {
return false
}
}
Expand Down
37 changes: 36 additions & 1 deletion go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package vtgate

import (
"context"
"fmt"
"time"
"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/vt/vtgate/boost"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -121,7 +124,17 @@ func (e *Executor) newExecute(ctx context.Context, safeSession *SafeSession, sql
}

// Check if boosted and hit Redis
// plan.BoostPlanConfig.IsBoosted == true

if plan.BoostPlanConfig != nil && plan.BoostPlanConfig.IsBoosted {
cacheKey := cacheKey(plan.BoostPlanConfig, bindVars)

fmt.Println("Cache Key: ", cacheKey)

redisResults, err := e.boostCache.Get(cacheKey)

fmt.Println("Redis Results: ", redisResults)
fmt.Println("Error: ", err)
}

statementTypeResult, sqlResult, err := e.executePlan(ctx, plan, vcursor, bindVars, execStart)(logStats, safeSession)

Expand All @@ -130,6 +143,28 @@ func (e *Executor) newExecute(ctx context.Context, safeSession *SafeSession, sql
return statementTypeResult, sqlResult, err
}

func cacheKey(config *boost.PlanConfig, vars map[string]*querypb.BindVariable) string {
return redis.GenerateCacheKey(cacheKeyParams(config, vars)...)
}

func cacheKeyParams(boostConfig *boost.PlanConfig, vars map[string]*querypb.BindVariable) []string {
var allColumns []string
var allValues []string

for key, vtgValueKey := range boostConfig.Columns {
allColumns = append(allColumns, key)

var byteArray = vars[vtgValueKey].Value
var stringValue = string(byteArray)

allValues = append(allValues, stringValue)
}

tail := append(allColumns, allValues...)

return append([]string{boostConfig.Table}, tail...)
}

func (e *Executor) startTxIfNecessary(ctx context.Context, safeSession *SafeSession) error {
if !safeSession.Autocommit && !safeSession.InTransaction() {
if err := e.txConn.Begin(ctx, safeSession); err != nil {
Expand Down
18 changes: 9 additions & 9 deletions go/vt/vtgate/vindexes/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const (
// used for building routing plans.
type VSchema struct {
RoutingRules map[string]*RoutingRule `json:"routing_rules"`
uniqueTables map[string]*Table
UniqueTables map[string]*Table
uniqueVindexes map[string]Vindex
Keyspaces map[string]*KeyspaceSchema `json:"keyspaces"`
}
Expand Down Expand Up @@ -165,7 +165,7 @@ type AutoIncrement struct {
func BuildVSchema(source *vschemapb.SrvVSchema) (vschema *VSchema) {
vschema = &VSchema{
RoutingRules: make(map[string]*RoutingRule),
uniqueTables: make(map[string]*Table),
UniqueTables: make(map[string]*Table),
uniqueVindexes: make(map[string]Vindex),
Keyspaces: make(map[string]*KeyspaceSchema),
}
Expand All @@ -189,7 +189,7 @@ func BuildKeyspaceSchema(input *vschemapb.Keyspace, keyspace string) (*KeyspaceS
},
}
vschema := &VSchema{
uniqueTables: make(map[string]*Table),
UniqueTables: make(map[string]*Table),
uniqueVindexes: make(map[string]Vindex),
Keyspaces: make(map[string]*KeyspaceSchema),
}
Expand Down Expand Up @@ -335,10 +335,10 @@ func buildTables(ks *vschemapb.Keyspace, vschema *VSchema, ksvschema *KeyspaceSc
// Add the table to the map entries.
// If the keyspace requires explicit routing, don't include it in global routing
if !ks.RequireExplicitRouting {
if _, ok := vschema.uniqueTables[tname]; ok {
vschema.uniqueTables[tname] = nil
if _, ok := vschema.UniqueTables[tname]; ok {
vschema.UniqueTables[tname] = nil
} else {
vschema.uniqueTables[tname] = t
vschema.UniqueTables[tname] = t
}
}
ksvschema.Tables[tname] = t
Expand All @@ -362,7 +362,7 @@ func resolveAutoIncrement(source *vschemapb.SrvVSchema, vschema *VSchema) {
if err != nil {
// Better to remove the table than to leave it partially initialized.
delete(ksvschema.Tables, tname)
delete(vschema.uniqueTables, tname)
delete(vschema.UniqueTables, tname)
ksvschema.Error = fmt.Errorf("cannot resolve sequence %s: %v", table.AutoIncrement.Sequence, err)
continue
}
Expand Down Expand Up @@ -391,7 +391,7 @@ func addDual(vschema *VSchema) {
// the keyspaces. For consistency, we'll always use the
// first keyspace by lexical ordering.
first = ksname
vschema.uniqueTables["dual"] = t
vschema.UniqueTables["dual"] = t
}
}
}
Expand Down Expand Up @@ -464,7 +464,7 @@ func (vschema *VSchema) FindTable(keyspace, tablename string) (*Table, error) {
// findTable is like FindTable, but does not return an error if a table is not found.
func (vschema *VSchema) findTable(keyspace, tablename string) (*Table, error) {
if keyspace == "" {
table, ok := vschema.uniqueTables[tablename]
table, ok := vschema.UniqueTables[tablename]
if table == nil {
if ok {
return nil, fmt.Errorf("ambiguous table reference: %s", tablename)
Expand Down
Loading
Loading