diff --git a/go/cmd/vtcombo/main.go b/go/cmd/vtcombo/main.go index f425ce40761..d1757acdb77 100644 --- a/go/cmd/vtcombo/main.go +++ b/go/cmd/vtcombo/main.go @@ -59,6 +59,8 @@ var ( mysqlPort = flag.Int("mysql_port", 3306, "mysql port") + configPath = flag.String("boost_configs_path", "./boost_query_configs.yaml", "path to the boost query configurations file") + ts *topo.Server resilientServer *srvtopo.ResilientServer ) @@ -214,7 +216,7 @@ func main() { vtgate.QueryLogHandler = "/debug/vtgate/querylog" vtgate.QueryLogzHandler = "/debug/vtgate/querylogz" vtgate.QueryzHandler = "/debug/vtgate/queryz" - vtg := vtgate.Init(context.Background(), resilientServer, tpb.Cells[0], tabletTypesToWait) + vtg := vtgate.Init(context.Background(), resilientServer, tpb.Cells[0], tabletTypesToWait, configPath) // vtctld configuration and init vtctld.InitVtctld(ts) diff --git a/go/cmd/vtgate/vtgate.go b/go/cmd/vtgate/vtgate.go index 3f98b7d6212..fb59a51ceb1 100644 --- a/go/cmd/vtgate/vtgate.go +++ b/go/cmd/vtgate/vtgate.go @@ -44,6 +44,8 @@ var ( tabletTypesToWait = flag.String("tablet_types_to_wait", "", "wait till connected for specified tablet types during Gateway initialization") ) +var configPath = flag.String("boost_configs_path", "./boost_query_configs.yaml", "path to the boost query configurations file") + var resilientServer *srvtopo.ResilientServer var legacyHealthCheck discovery.LegacyHealthCheck @@ -153,7 +155,7 @@ func main() { vtg = vtgate.LegacyInit(context.Background(), legacyHealthCheck, resilientServer, *cell, *vtgate.RetryCount, tabletTypes) } else { // use new Init otherwise - vtg = vtgate.Init(context.Background(), resilientServer, *cell, tabletTypes) + vtg = vtgate.Init(context.Background(), resilientServer, *cell, tabletTypes, configPath) } servenv.OnRun(func() { diff --git a/go/vt/vtexplain/vtexplain_vtgate.go b/go/vt/vtexplain/vtexplain_vtgate.go index 748777c2b01..b3b48442aef 100644 --- a/go/vt/vtexplain/vtexplain_vtgate.go +++ b/go/vt/vtexplain/vtexplain_vtgate.go @@ -71,7 +71,7 @@ func initVtgateExecutor(vSchemaStr, ksShardMapStr string, opts *Options) error { streamSize := 10 var schemaTracker vtgate.SchemaInfo // no schema tracker for these tests - vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig, schemaTracker) + vtgateExecutor = vtgate.NewExecutor(context.Background(), explainTopo, vtexplainCell, resolver, opts.Normalize, false /*do not warn for sharded only*/, streamSize, cache.DefaultConfig, schemaTracker, nil, nil) queryLogBufferSize := 10 vtgate.QueryLogger = streamlog.New("VTGate", queryLogBufferSize) diff --git a/go/vt/vtgate/boost/boost.go b/go/vt/vtgate/boost/boost.go index 51fcd510503..004b8e33adc 100644 --- a/go/vt/vtgate/boost/boost.go +++ b/go/vt/vtgate/boost/boost.go @@ -3,6 +3,23 @@ package boost type Columns map[string]string type PlanConfig struct { - IsBoosted bool - BoostColumns Columns + IsBoosted bool + Columns Columns + Table string +} + +func NonBoostedPlanConfig() *PlanConfig { + return &PlanConfig{ + IsBoosted: false, + Columns: Columns{}, + } +} + +type QueryFilterConfig struct { + Columns []string `yaml:"columns"` + TableName string `yaml:"tableName"` +} + +type QueryFilterConfigs struct { + BoostConfigs []QueryFilterConfig `yaml:"boostConfigs"` } diff --git a/go/vt/vtgate/boost/load_configs.go b/go/vt/vtgate/boost/load_configs.go new file mode 100644 index 00000000000..461b1670b24 --- /dev/null +++ b/go/vt/vtgate/boost/load_configs.go @@ -0,0 +1,43 @@ +package boost + +import ( + "fmt" + "gopkg.in/yaml.v2" + "io/ioutil" +) + +func Load(configPath *string) (*QueryFilterConfigs, error) { + data, err := ioutil.ReadFile(*configPath) + fmt.Printf("Reading Boost Configs from: %s\n", *configPath) + if err != nil { + fmt.Printf("Error reading file: %v\n", err) + return nil, err + } + + var configs QueryFilterConfigs + err = yaml.Unmarshal(data, &configs) + if err != nil { + return nil, err + } + + fmt.Printf("Parsed Configs: %+v\n", configs) + + rocket := ` + | + / \ + / _ \ + |.'''.| + |'._.'| + |BOOST| + ,'| | |\. + / | | | \ + |,-'--|--'-.| + || || || +` + + fmt.Println(rocket) + + fmt.Println("Boost enabled.") + + return &configs, nil +} diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 0b5bc069483..cab95fea781 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -28,6 +28,7 @@ import ( "strings" "sync" "time" + "vitess.io/vitess/go/cache/redis" "vitess.io/vitess/go/vt/vtgate/boost" "vitess.io/vitess/go/acl" @@ -101,6 +102,9 @@ type Executor struct { vm *VSchemaManager schemaTracker SchemaInfo + + queryFilterConfigs *boost.QueryFilterConfigs + boostCache *redis.Cache } var executorOnce sync.Once @@ -119,18 +123,22 @@ func NewExecutor( streamSize int, cacheCfg *cache.Config, schemaTracker SchemaInfo, + queryFilterConfigs *boost.QueryFilterConfigs, + boostCache *redis.Cache, ) *Executor { e := &Executor{ - serv: serv, - cell: cell, - resolver: resolver, - scatterConn: resolver.scatterConn, - txConn: resolver.scatterConn.txConn, - plans: cache.NewDefaultCacheImpl(cacheCfg), - normalize: normalize, - warnShardedOnly: warnOnShardedOnly, - streamSize: streamSize, - schemaTracker: schemaTracker, + serv: serv, + cell: cell, + resolver: resolver, + scatterConn: resolver.scatterConn, + txConn: resolver.scatterConn.txConn, + plans: cache.NewDefaultCacheImpl(cacheCfg), + normalize: normalize, + warnShardedOnly: warnOnShardedOnly, + streamSize: streamSize, + schemaTracker: schemaTracker, + queryFilterConfigs: queryFilterConfigs, + boostCache: boostCache, } vschemaacl.Init() @@ -1202,7 +1210,7 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser. } ignoreMaxMemoryRows := sqlparser.IgnoreMaxMaxMemoryRowsDirective(stmt) vcursor.SetIgnoreMaxMemoryRows(ignoreMaxMemoryRows) - var boostPlanConfig *boost.PlanConfig + var boostPlanConfig = boost.NonBoostedPlanConfig() // Normalize if possible and retry. if (e.normalize && sqlparser.CanNormalize(stmt)) || sqlparser.MustRewriteAST(stmt) { @@ -1214,7 +1222,7 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser. statement = result.AST bindVarNeeds = result.BindVarNeeds query = sqlparser.String(statement) - boostPlanConfig = configForBoost(result.Columns, "") + boostPlanConfig = configForBoost(e.queryFilterConfigs, result.Columns, vcursor.vschema.UniqueTables) } if logStats != nil { @@ -1242,30 +1250,32 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser. return plan, nil } -// TODO -func configForBoost(columns boost.Columns, table string) *boost.PlanConfig { - configColumns := map[string]string{ - "user_id": "1337", +func configForBoost(configs *boost.QueryFilterConfigs, columns boost.Columns, inputMap map[string]*vindexes.Table) *boost.PlanConfig { + if configs == nil || columns == nil { + return boost.NonBoostedPlanConfig() } - //todo compare sets for ordering - if !keysMatch(columns, configColumns) { - return &boost.PlanConfig{} - } + for tableName, _ := range inputMap { + for _, filterConfig := range configs.BoostConfigs { + if tableName == filterConfig.TableName { - return &boost.PlanConfig{ - IsBoosted: true, - BoostColumns: columns, + if keysMatch(columns, filterConfig.Columns) { + return &boost.PlanConfig{ + IsBoosted: true, + Columns: columns, + Table: tableName, + } + } + } + } } -} -func keysMatch(map1, map2 map[string]string) bool { - if len(map1) != len(map2) { - return false - } + return boost.NonBoostedPlanConfig() +} - for k := range map1 { - if _, exists := map2[k]; !exists { +func keysMatch(columns map[string]string, keys []string) bool { + for _, k := range keys { + if _, exists := columns[k]; !exists { return false } } diff --git a/go/vt/vtgate/plan_execute.go b/go/vt/vtgate/plan_execute.go index f19c392809f..c2c21886410 100644 --- a/go/vt/vtgate/plan_execute.go +++ b/go/vt/vtgate/plan_execute.go @@ -18,7 +18,10 @@ package vtgate import ( "context" + "fmt" "time" + "vitess.io/vitess/go/cache/redis" + "vitess.io/vitess/go/vt/vtgate/boost" "vitess.io/vitess/go/sqltypes" querypb "vitess.io/vitess/go/vt/proto/query" @@ -121,7 +124,17 @@ func (e *Executor) newExecute(ctx context.Context, safeSession *SafeSession, sql } // Check if boosted and hit Redis - // plan.BoostPlanConfig.IsBoosted == true + + if plan.BoostPlanConfig != nil && plan.BoostPlanConfig.IsBoosted { + cacheKey := cacheKey(plan.BoostPlanConfig, bindVars) + + fmt.Println("Cache Key: ", cacheKey) + + redisResults, err := e.boostCache.Get(cacheKey) + + fmt.Println("Redis Results: ", redisResults) + fmt.Println("Error: ", err) + } statementTypeResult, sqlResult, err := e.executePlan(ctx, plan, vcursor, bindVars, execStart)(logStats, safeSession) @@ -130,6 +143,28 @@ func (e *Executor) newExecute(ctx context.Context, safeSession *SafeSession, sql return statementTypeResult, sqlResult, err } +func cacheKey(config *boost.PlanConfig, vars map[string]*querypb.BindVariable) string { + return redis.GenerateCacheKey(cacheKeyParams(config, vars)...) +} + +func cacheKeyParams(boostConfig *boost.PlanConfig, vars map[string]*querypb.BindVariable) []string { + var allColumns []string + var allValues []string + + for key, vtgValueKey := range boostConfig.Columns { + allColumns = append(allColumns, key) + + var byteArray = vars[vtgValueKey].Value + var stringValue = string(byteArray) + + allValues = append(allValues, stringValue) + } + + tail := append(allColumns, allValues...) + + return append([]string{boostConfig.Table}, tail...) +} + func (e *Executor) startTxIfNecessary(ctx context.Context, safeSession *SafeSession) error { if !safeSession.Autocommit && !safeSession.InTransaction() { if err := e.txConn.Begin(ctx, safeSession); err != nil { diff --git a/go/vt/vtgate/vindexes/vschema.go b/go/vt/vtgate/vindexes/vschema.go index 53ed49ad870..81a7d78e015 100644 --- a/go/vt/vtgate/vindexes/vschema.go +++ b/go/vt/vtgate/vindexes/vschema.go @@ -58,7 +58,7 @@ const ( // used for building routing plans. type VSchema struct { RoutingRules map[string]*RoutingRule `json:"routing_rules"` - uniqueTables map[string]*Table + UniqueTables map[string]*Table uniqueVindexes map[string]Vindex Keyspaces map[string]*KeyspaceSchema `json:"keyspaces"` } @@ -165,7 +165,7 @@ type AutoIncrement struct { func BuildVSchema(source *vschemapb.SrvVSchema) (vschema *VSchema) { vschema = &VSchema{ RoutingRules: make(map[string]*RoutingRule), - uniqueTables: make(map[string]*Table), + UniqueTables: make(map[string]*Table), uniqueVindexes: make(map[string]Vindex), Keyspaces: make(map[string]*KeyspaceSchema), } @@ -189,7 +189,7 @@ func BuildKeyspaceSchema(input *vschemapb.Keyspace, keyspace string) (*KeyspaceS }, } vschema := &VSchema{ - uniqueTables: make(map[string]*Table), + UniqueTables: make(map[string]*Table), uniqueVindexes: make(map[string]Vindex), Keyspaces: make(map[string]*KeyspaceSchema), } @@ -335,10 +335,10 @@ func buildTables(ks *vschemapb.Keyspace, vschema *VSchema, ksvschema *KeyspaceSc // Add the table to the map entries. // If the keyspace requires explicit routing, don't include it in global routing if !ks.RequireExplicitRouting { - if _, ok := vschema.uniqueTables[tname]; ok { - vschema.uniqueTables[tname] = nil + if _, ok := vschema.UniqueTables[tname]; ok { + vschema.UniqueTables[tname] = nil } else { - vschema.uniqueTables[tname] = t + vschema.UniqueTables[tname] = t } } ksvschema.Tables[tname] = t @@ -362,7 +362,7 @@ func resolveAutoIncrement(source *vschemapb.SrvVSchema, vschema *VSchema) { if err != nil { // Better to remove the table than to leave it partially initialized. delete(ksvschema.Tables, tname) - delete(vschema.uniqueTables, tname) + delete(vschema.UniqueTables, tname) ksvschema.Error = fmt.Errorf("cannot resolve sequence %s: %v", table.AutoIncrement.Sequence, err) continue } @@ -391,7 +391,7 @@ func addDual(vschema *VSchema) { // the keyspaces. For consistency, we'll always use the // first keyspace by lexical ordering. first = ksname - vschema.uniqueTables["dual"] = t + vschema.UniqueTables["dual"] = t } } } @@ -464,7 +464,7 @@ func (vschema *VSchema) FindTable(keyspace, tablename string) (*Table, error) { // findTable is like FindTable, but does not return an error if a table is not found. func (vschema *VSchema) findTable(keyspace, tablename string) (*Table, error) { if keyspace == "" { - table, ok := vschema.uniqueTables[tablename] + table, ok := vschema.UniqueTables[tablename] if table == nil { if ok { return nil, fmt.Errorf("ambiguous table reference: %s", tablename) diff --git a/go/vt/vtgate/vindexes/vschema_test.go b/go/vt/vtgate/vindexes/vschema_test.go index daf0708cc5f..660e8fb7fe3 100644 --- a/go/vt/vtgate/vindexes/vschema_test.go +++ b/go/vt/vtgate/vindexes/vschema_test.go @@ -223,7 +223,7 @@ func TestUnshardedVSchema(t *testing.T) { } want := &VSchema{ RoutingRules: map[string]*RoutingRule{}, - uniqueTables: map[string]*Table{ + UniqueTables: map[string]*Table{ "t1": t1, "dual": dual, }, @@ -285,7 +285,7 @@ func TestVSchemaColumns(t *testing.T) { } want := &VSchema{ RoutingRules: map[string]*RoutingRule{}, - uniqueTables: map[string]*Table{ + UniqueTables: map[string]*Table{ "t1": t1, "dual": dual, }, @@ -349,7 +349,7 @@ func TestVSchemaColumnListAuthoritative(t *testing.T) { } want := &VSchema{ RoutingRules: map[string]*RoutingRule{}, - uniqueTables: map[string]*Table{ + UniqueTables: map[string]*Table{ "t1": t1, "dual": dual, }, @@ -428,7 +428,7 @@ func TestVSchemaPinned(t *testing.T) { } want := &VSchema{ RoutingRules: map[string]*RoutingRule{}, - uniqueTables: map[string]*Table{ + UniqueTables: map[string]*Table{ "t1": t1, "dual": dual, }, @@ -530,7 +530,7 @@ func TestShardedVSchemaOwned(t *testing.T) { } want := &VSchema{ RoutingRules: map[string]*RoutingRule{}, - uniqueTables: map[string]*Table{ + UniqueTables: map[string]*Table{ "t1": t1, "dual": dual, }, @@ -778,7 +778,7 @@ func TestVSchemaRoutingRules(t *testing.T) { Error: errors.New("table t2 not found"), }, }, - uniqueTables: map[string]*Table{ + UniqueTables: map[string]*Table{ "t1": t1, "t2": t2, "dual": dual1, @@ -1195,7 +1195,7 @@ func TestShardedVSchemaMultiColumnVindex(t *testing.T) { } want := &VSchema{ RoutingRules: map[string]*RoutingRule{}, - uniqueTables: map[string]*Table{ + UniqueTables: map[string]*Table{ "t1": t1, "dual": dual, }, @@ -1293,7 +1293,7 @@ func TestShardedVSchemaNotOwned(t *testing.T) { } want := &VSchema{ RoutingRules: map[string]*RoutingRule{}, - uniqueTables: map[string]*Table{ + UniqueTables: map[string]*Table{ "t1": t1, "dual": dual, }, @@ -1425,7 +1425,7 @@ func TestBuildVSchemaDupSeq(t *testing.T) { } want := &VSchema{ RoutingRules: map[string]*RoutingRule{}, - uniqueTables: map[string]*Table{ + UniqueTables: map[string]*Table{ "t1": nil, "dual": duala, }, @@ -1498,7 +1498,7 @@ func TestBuildVSchemaDupTable(t *testing.T) { } want := &VSchema{ RoutingRules: map[string]*RoutingRule{}, - uniqueTables: map[string]*Table{ + UniqueTables: map[string]*Table{ "t1": nil, "dual": duala, }, @@ -1632,7 +1632,7 @@ func TestBuildVSchemaDupVindex(t *testing.T) { } want := &VSchema{ RoutingRules: map[string]*RoutingRule{}, - uniqueTables: map[string]*Table{ + UniqueTables: map[string]*Table{ "t1": nil, "dual": duala, }, @@ -1949,7 +1949,7 @@ func TestSequence(t *testing.T) { } want := &VSchema{ RoutingRules: map[string]*RoutingRule{}, - uniqueTables: map[string]*Table{ + UniqueTables: map[string]*Table{ "seq": seq, "t1": t1, "t2": t2, diff --git a/go/vt/vtgate/vtgate.go b/go/vt/vtgate/vtgate.go index ac8ec539def..022cc530704 100644 --- a/go/vt/vtgate/vtgate.go +++ b/go/vt/vtgate/vtgate.go @@ -25,6 +25,8 @@ import ( "os" "strings" "time" + "vitess.io/vitess/go/cache/redis" + "vitess.io/vitess/go/vt/vtgate/boost" "vitess.io/vitess/go/vt/key" @@ -157,7 +159,7 @@ type RegisterVTGate func(vtgateservice.VTGateService) var RegisterVTGates []RegisterVTGate // Init initializes VTGate server. -func Init(ctx context.Context, serv srvtopo.Server, cell string, tabletTypesToWait []topodatapb.TabletType) *VTGate { +func Init(ctx context.Context, serv srvtopo.Server, cell string, tabletTypesToWait []topodatapb.TabletType, boostQueryFilterConfigPath *string) *VTGate { if rpcVTGate != nil { log.Fatalf("VTGate already initialized") } @@ -214,7 +216,10 @@ func Init(ctx context.Context, serv srvtopo.Server, cell string, tabletTypesToWa LFU: *queryPlanCacheLFU, } - executor := NewExecutor(ctx, serv, cell, resolver, *normalizeQueries, *warnShardedOnly, *streamBufferSize, cacheCfg, si) + queryFilterConfigs, _ := boost.Load(boostQueryFilterConfigPath) + boostCache := redis.NewCache() + + executor := NewExecutor(ctx, serv, cell, resolver, *normalizeQueries, *warnShardedOnly, *streamBufferSize, cacheCfg, si, queryFilterConfigs, boostCache) // connect the schema tracker with the vschema manager if *enableSchemaChangeSignal { @@ -617,8 +622,11 @@ func LegacyInit(ctx context.Context, hc discovery.LegacyHealthCheck, serv srvtop LFU: *queryPlanCacheLFU, } + queryFilterConfigs := &boost.QueryFilterConfigs{} + boostCache := redis.NewCache() + rpcVTGate = &VTGate{ - executor: NewExecutor(ctx, serv, cell, resolver, *normalizeQueries, *warnShardedOnly, *streamBufferSize, cacheCfg, nil), + executor: NewExecutor(ctx, serv, cell, resolver, *normalizeQueries, *warnShardedOnly, *streamBufferSize, cacheCfg, nil, queryFilterConfigs, boostCache), resolver: resolver, vsm: vsm, txConn: tc,