Skip to content

Commit

Permalink
Merge pull request #12 from vinted/vinted/zygis/is-boosted-checks
Browse files Browse the repository at this point in the history
boost: load query filters configs and check for boost
  • Loading branch information
DeathBorn authored Mar 27, 2024
2 parents f25c065 + d3e8953 commit f22c593
Show file tree
Hide file tree
Showing 10 changed files with 177 additions and 60 deletions.
4 changes: 3 additions & 1 deletion go/cmd/vtcombo/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ var (

mysqlPort = flag.Int("mysql_port", 3306, "mysql port")

configPath = flag.String("boost_configs_path", "./boost_query_configs.yaml", "path to the boost query configurations file")

ts *topo.Server
resilientServer *srvtopo.ResilientServer
)
Expand Down Expand Up @@ -214,7 +216,7 @@ func main() {
vtgate.QueryLogHandler = "/debug/vtgate/querylog"
vtgate.QueryLogzHandler = "/debug/vtgate/querylogz"
vtgate.QueryzHandler = "/debug/vtgate/queryz"
vtg := vtgate.Init(context.Background(), resilientServer, tpb.Cells[0], tabletTypesToWait)
vtg := vtgate.Init(context.Background(), resilientServer, tpb.Cells[0], tabletTypesToWait, configPath)

// vtctld configuration and init
vtctld.InitVtctld(ts)
Expand Down
4 changes: 3 additions & 1 deletion go/cmd/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ var (
tabletTypesToWait = flag.String("tablet_types_to_wait", "", "wait till connected for specified tablet types during Gateway initialization")
)

var configPath = flag.String("boost_configs_path", "./boost_query_configs.yaml", "path to the boost query configurations file")

var resilientServer *srvtopo.ResilientServer
var legacyHealthCheck discovery.LegacyHealthCheck

Expand Down Expand Up @@ -153,7 +155,7 @@ func main() {
vtg = vtgate.LegacyInit(context.Background(), legacyHealthCheck, resilientServer, *cell, *vtgate.RetryCount, tabletTypes)
} else {
// use new Init otherwise
vtg = vtgate.Init(context.Background(), resilientServer, *cell, tabletTypes)
vtg = vtgate.Init(context.Background(), resilientServer, *cell, tabletTypes, configPath)
}

servenv.OnRun(func() {
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtexplain/vtexplain_vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts *Options) error {

streamSize := 10
var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests
vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig, schemaTracker)
vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig, schemaTracker, nil, nil)

queryLogBufferSize := 10
vtgate.QueryLogger = streamlog.New("VTGate", queryLogBufferSize)
Expand Down
21 changes: 19 additions & 2 deletions go/vt/vtgate/boost/boost.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,23 @@ package boost
type Columns map[string]string

type PlanConfig struct {
IsBoosted bool
BoostColumns Columns
IsBoosted bool
Columns Columns
Table string
}

func NonBoostedPlanConfig() *PlanConfig {
return &PlanConfig{
IsBoosted: false,
Columns: Columns{},
}
}

type QueryFilterConfig struct {
Columns []string `yaml:"columns"`
TableName string `yaml:"tableName"`
}

type QueryFilterConfigs struct {
BoostConfigs []QueryFilterConfig `yaml:"boostConfigs"`
}
43 changes: 43 additions & 0 deletions go/vt/vtgate/boost/load_configs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package boost

import (
"fmt"
"gopkg.in/yaml.v2"
"io/ioutil"
)

func Load(configPath *string) (*QueryFilterConfigs, error) {
data, err := ioutil.ReadFile(*configPath)
fmt.Printf("Reading Boost Configs from: %s\n", *configPath)
if err != nil {
fmt.Printf("Error reading file: %v\n", err)
return nil, err
}

var configs QueryFilterConfigs
err = yaml.Unmarshal(data, &configs)
if err != nil {
return nil, err
}

fmt.Printf("Parsed Configs: %+v\n", configs)

rocket := `
|
/ \
/ _ \
|.'''.|
|'._.'|
|BOOST|
,'| | |\.
/ | | | \
|,-'--|--'-.|
|| || ||
`

fmt.Println(rocket)

fmt.Println("Boost enabled.")

return &configs, nil
}
70 changes: 40 additions & 30 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"strings"
"sync"
"time"
"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/vt/vtgate/boost"

"vitess.io/vitess/go/acl"
Expand Down Expand Up @@ -101,6 +102,9 @@ type Executor struct {

vm *VSchemaManager
schemaTracker SchemaInfo

queryFilterConfigs *boost.QueryFilterConfigs
boostCache *redis.Cache
}

var executorOnce sync.Once
Expand All @@ -119,18 +123,22 @@ func NewExecutor(
streamSize int,
cacheCfg *cache.Config,
schemaTracker SchemaInfo,
queryFilterConfigs *boost.QueryFilterConfigs,
boostCache *redis.Cache,
) *Executor {
e := &Executor{
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
serv: serv,
cell: cell,
resolver: resolver,
scatterConn: resolver.scatterConn,
txConn: resolver.scatterConn.txConn,
plans: cache.NewDefaultCacheImpl(cacheCfg),
normalize: normalize,
warnShardedOnly: warnOnShardedOnly,
streamSize: streamSize,
schemaTracker: schemaTracker,
queryFilterConfigs: queryFilterConfigs,
boostCache: boostCache,
}

vschemaacl.Init()
Expand Down Expand Up @@ -1202,7 +1210,7 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
}
ignoreMaxMemoryRows := sqlparser.IgnoreMaxMaxMemoryRowsDirective(stmt)
vcursor.SetIgnoreMaxMemoryRows(ignoreMaxMemoryRows)
var boostPlanConfig *boost.PlanConfig
var boostPlanConfig = boost.NonBoostedPlanConfig()

// Normalize if possible and retry.
if (e.normalize && sqlparser.CanNormalize(stmt)) || sqlparser.MustRewriteAST(stmt) {
Expand All @@ -1214,7 +1222,7 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
statement = result.AST
bindVarNeeds = result.BindVarNeeds
query = sqlparser.String(statement)
boostPlanConfig = configForBoost(result.Columns, "<TODO>")
boostPlanConfig = configForBoost(e.queryFilterConfigs, result.Columns, vcursor.vschema.UniqueTables)
}

if logStats != nil {
Expand Down Expand Up @@ -1242,30 +1250,32 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
return plan, nil
}

// TODO
func configForBoost(columns boost.Columns, table string) *boost.PlanConfig {
configColumns := map[string]string{
"user_id": "1337",
func configForBoost(configs *boost.QueryFilterConfigs, columns boost.Columns, inputMap map[string]*vindexes.Table) *boost.PlanConfig {
if configs == nil || columns == nil {
return boost.NonBoostedPlanConfig()
}

//todo compare sets for ordering
if !keysMatch(columns, configColumns) {
return &boost.PlanConfig{}
}
for tableName, _ := range inputMap {
for _, filterConfig := range configs.BoostConfigs {
if tableName == filterConfig.TableName {

return &boost.PlanConfig{
IsBoosted: true,
BoostColumns: columns,
if keysMatch(columns, filterConfig.Columns) {
return &boost.PlanConfig{
IsBoosted: true,
Columns: columns,
Table: tableName,
}
}
}
}
}
}

func keysMatch(map1, map2 map[string]string) bool {
if len(map1) != len(map2) {
return false
}
return boost.NonBoostedPlanConfig()
}

for k := range map1 {
if _, exists := map2[k]; !exists {
func keysMatch(columns map[string]string, keys []string) bool {
for _, k := range keys {
if _, exists := columns[k]; !exists {
return false
}
}
Expand Down
37 changes: 36 additions & 1 deletion go/vt/vtgate/plan_execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,10 @@ package vtgate

import (
"context"
"fmt"
"time"
"vitess.io/vitess/go/cache/redis"
"vitess.io/vitess/go/vt/vtgate/boost"

"vitess.io/vitess/go/sqltypes"
querypb "vitess.io/vitess/go/vt/proto/query"
Expand Down Expand Up @@ -121,7 +124,17 @@ func (e *Executor) newExecute(ctx context.Context, safeSession *SafeSession, sql
}

// Check if boosted and hit Redis
// plan.BoostPlanConfig.IsBoosted == true

if plan.BoostPlanConfig != nil && plan.BoostPlanConfig.IsBoosted {
cacheKey := cacheKey(plan.BoostPlanConfig, bindVars)

fmt.Println("Cache Key: ", cacheKey)

redisResults, err := e.boostCache.Get(cacheKey)

fmt.Println("Redis Results: ", redisResults)
fmt.Println("Error: ", err)
}

statementTypeResult, sqlResult, err := e.executePlan(ctx, plan, vcursor, bindVars, execStart)(logStats, safeSession)

Expand All @@ -130,6 +143,28 @@ func (e *Executor) newExecute(ctx context.Context, safeSession *SafeSession, sql
return statementTypeResult, sqlResult, err
}

func cacheKey(config *boost.PlanConfig, vars map[string]*querypb.BindVariable) string {
return redis.GenerateCacheKey(cacheKeyParams(config, vars)...)
}

func cacheKeyParams(boostConfig *boost.PlanConfig, vars map[string]*querypb.BindVariable) []string {
var allColumns []string
var allValues []string

for key, vtgValueKey := range boostConfig.Columns {
allColumns = append(allColumns, key)

var byteArray = vars[vtgValueKey].Value
var stringValue = string(byteArray)

allValues = append(allValues, stringValue)
}

tail := append(allColumns, allValues...)

return append([]string{boostConfig.Table}, tail...)
}

func (e *Executor) startTxIfNecessary(ctx context.Context, safeSession *SafeSession) error {
if !safeSession.Autocommit && !safeSession.InTransaction() {
if err := e.txConn.Begin(ctx, safeSession); err != nil {
Expand Down
18 changes: 9 additions & 9 deletions go/vt/vtgate/vindexes/vschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ const (
// used for building routing plans.
type VSchema struct {
RoutingRules map[string]*RoutingRule `json:"routing_rules"`
uniqueTables map[string]*Table
UniqueTables map[string]*Table
uniqueVindexes map[string]Vindex
Keyspaces map[string]*KeyspaceSchema `json:"keyspaces"`
}
Expand Down Expand Up @@ -165,7 +165,7 @@ type AutoIncrement struct {
func BuildVSchema(source *vschemapb.SrvVSchema) (vschema *VSchema) {
vschema = &VSchema{
RoutingRules: make(map[string]*RoutingRule),
uniqueTables: make(map[string]*Table),
UniqueTables: make(map[string]*Table),
uniqueVindexes: make(map[string]Vindex),
Keyspaces: make(map[string]*KeyspaceSchema),
}
Expand All @@ -189,7 +189,7 @@ func BuildKeyspaceSchema(input *vschemapb.Keyspace, keyspace string) (*KeyspaceS
},
}
vschema := &VSchema{
uniqueTables: make(map[string]*Table),
UniqueTables: make(map[string]*Table),
uniqueVindexes: make(map[string]Vindex),
Keyspaces: make(map[string]*KeyspaceSchema),
}
Expand Down Expand Up @@ -335,10 +335,10 @@ func buildTables(ks *vschemapb.Keyspace, vschema *VSchema, ksvschema *KeyspaceSc
// Add the table to the map entries.
// If the keyspace requires explicit routing, don't include it in global routing
if !ks.RequireExplicitRouting {
if _, ok := vschema.uniqueTables[tname]; ok {
vschema.uniqueTables[tname] = nil
if _, ok := vschema.UniqueTables[tname]; ok {
vschema.UniqueTables[tname] = nil
} else {
vschema.uniqueTables[tname] = t
vschema.UniqueTables[tname] = t
}
}
ksvschema.Tables[tname] = t
Expand All @@ -362,7 +362,7 @@ func resolveAutoIncrement(source *vschemapb.SrvVSchema, vschema *VSchema) {
if err != nil {
// Better to remove the table than to leave it partially initialized.
delete(ksvschema.Tables, tname)
delete(vschema.uniqueTables, tname)
delete(vschema.UniqueTables, tname)
ksvschema.Error = fmt.Errorf("cannot resolve sequence %s: %v", table.AutoIncrement.Sequence, err)
continue
}
Expand Down Expand Up @@ -391,7 +391,7 @@ func addDual(vschema *VSchema) {
// the keyspaces. For consistency, we'll always use the
// first keyspace by lexical ordering.
first = ksname
vschema.uniqueTables["dual"] = t
vschema.UniqueTables["dual"] = t
}
}
}
Expand Down Expand Up @@ -464,7 +464,7 @@ func (vschema *VSchema) FindTable(keyspace, tablename string) (*Table, error) {
// findTable is like FindTable, but does not return an error if a table is not found.
func (vschema *VSchema) findTable(keyspace, tablename string) (*Table, error) {
if keyspace == "" {
table, ok := vschema.uniqueTables[tablename]
table, ok := vschema.UniqueTables[tablename]
if table == nil {
if ok {
return nil, fmt.Errorf("ambiguous table reference: %s", tablename)
Expand Down
Loading

0 comments on commit f22c593

Please sign in to comment.