Skip to content

Commit

Permalink
Merge pull request #6737 from planetscale/upd-vindex
Browse files Browse the repository at this point in the history
Update Vindex only on changes
  • Loading branch information
harshit-gangal authored Sep 24, 2020
2 parents 0d201be + 107c000 commit d4bd3ba
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 98 deletions.
4 changes: 2 additions & 2 deletions go/test/endtoend/vtgate/concurrentdml/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -364,8 +364,8 @@ func TestUpdateLookupUniqueVindex(t *testing.T) {
exec(t, conn, `update t1 set c3 = 400 where c2 = 200`)
// changed - same vindex
exec(t, conn, `update t1 set c4 = 'abc' where c1 = 999`)
// not changed - same vindex - not yet supported bcoz of varchar field
// exec(t, conn, `update t1 set c4 = 'abc' where c4 = 'abc'`)
// not changed - same vindex
exec(t, conn, `update t1 set c4 = 'abc' where c4 = 'abc'`)

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ update user set pet='fido' where id=1
update user set name='alicia' where id=1

1 ks_sharded/-40: begin
1 ks_sharded/-40: select id, name from user where id = 1 limit 10001 for update
1 ks_sharded/-40: select id, name, name = 'alicia' from user where id = 1 limit 10001 for update
2 ks_sharded/40-80: begin
2 ks_sharded/40-80: delete from name_user_map where name = 'name_val_2' and user_id = 1 limit 10001
3 ks_sharded/c0-: begin
Expand All @@ -42,7 +42,7 @@ update user set name='alicia' where name='alice'
1 ks_sharded/40-80: begin
1 ks_sharded/40-80: select name, user_id from name_user_map where name in ('alice') limit 10001 for update
2 ks_sharded/-40: begin
2 ks_sharded/-40: select id, name from user where name = 'alice' limit 10001 for update
2 ks_sharded/-40: select id, name, name = 'alicia' from user where name = 'alice' limit 10001 for update
3 ks_sharded/40-80: delete from name_user_map where name = 'name_val_2' and user_id = 1 limit 10001
4 ks_sharded/c0-: begin
4 ks_sharded/c0-: insert into name_user_map(name, user_id) values ('alicia', 1)
Expand Down
3 changes: 3 additions & 0 deletions go/vt/vtexplain/vtexplain_vttablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -614,6 +614,9 @@ func inferColTypeFromExpr(node sqlparser.Expr, colTypeMap map[string]querypb.Typ
case *sqlparser.NullVal:
colNames = append(colNames, sqlparser.String(node))
colTypes = append(colTypes, querypb.Type_NULL_TYPE)
case *sqlparser.ComparisonExpr:
colNames = append(colNames, sqlparser.String(node))
colTypes = append(colTypes, querypb.Type_INT64)
default:
log.Errorf("vtexplain: unsupported select expression type +%v node %s", reflect.TypeOf(node), sqlparser.String(node))
}
Expand Down
6 changes: 3 additions & 3 deletions go/vt/vtgate/autocommit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,8 +84,8 @@ func TestAutocommitUpdateLookup(t *testing.T) {
func TestAutocommitUpdateVindexChange(t *testing.T) {
executor, sbc, _, sbclookup := createLegacyExecutorEnv()
sbc.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|name|lastname", "int64|int32|varchar"),
"1|1|foo",
sqltypes.MakeTestFields("id|name|lastname|name_lastname_keyspace_id_map", "int64|int32|varchar|int64"),
"1|1|foo|0",
),
})

Expand All @@ -110,7 +110,7 @@ func TestAutocommitUpdateVindexChange(t *testing.T) {
testCommitCount(t, "sbclookup", sbclookup, 1)

testQueries(t, "sbc", sbc, []*querypb.BoundQuery{{
Sql: "select id, name, lastname from user2 where id = 1 for update",
Sql: "select id, name, lastname, name = 'myname' and lastname = 'mylastname' from user2 where id = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "update user2 set name = 'myname', lastname = 'mylastname' where id = 1",
Expand Down
25 changes: 20 additions & 5 deletions go/vt/vtgate/engine/update.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ import (
"sort"
"time"

"vitess.io/vitess/go/vt/vtgate/evalengine"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"

"vitess.io/vitess/go/sqltypes"
Expand All @@ -36,14 +38,17 @@ import (
var _ Primitive = (*Update)(nil)

// VindexValues contains changed values for a vindex.
type VindexValues map[string]sqltypes.PlanValue
type VindexValues struct {
PvMap map[string]sqltypes.PlanValue
Offset int // Offset from ownedVindexQuery to provide input decision for vindex update.
}

// Update represents the instructions to perform an update.
type Update struct {
DML

// ChangedVindexValues contains values for updated Vindexes during an update statement.
ChangedVindexValues map[string]VindexValues
ChangedVindexValues map[string]*VindexValues

// Update does not take inputs
noInputs
Expand Down Expand Up @@ -227,13 +232,23 @@ func (upd *Update) updateVindexEntries(vcursor VCursor, bindVars map[string]*que
for _, colVindex := range upd.Table.Owned {
// Update columns only if they're being changed.
if updColValues, ok := upd.ChangedVindexValues[colVindex.Name]; ok {
offset := updColValues.Offset
if !row[offset].IsNull() {
val, err := evalengine.ToInt64(row[offset])
if err != nil {
return err
}
if val == int64(1) { // 1 means that the old and new value are same and vindex update is not required.
continue
}
}
fromIds := make([]sqltypes.Value, 0, len(colVindex.Columns))
var vindexColumnKeys []sqltypes.Value
for _, vCol := range colVindex.Columns {
// Fetch the column values.
origColValue := row[fieldColNumMap[vCol.String()]]
fromIds = append(fromIds, origColValue)
if colValue, exists := updColValues[vCol.String()]; exists {
if colValue, exists := updColValues.PvMap[vCol.String()]; exists {
resolvedVal, err := colValue.ResolveValue(bindVars)
if err != nil {
return err
Expand Down Expand Up @@ -266,8 +281,8 @@ func (upd *Update) description() PrimitiveDescription {
addFieldsIfNotEmpty(upd.DML, other)

var changedVindexes []string
for vindex := range upd.ChangedVindexValues {
changedVindexes = append(changedVindexes, vindex)
for k, v := range upd.ChangedVindexValues {
changedVindexes = append(changedVindexes, fmt.Sprintf("%s:%d", k, v.Offset))
}
sort.Strings(changedVindexes) // We sort these so random changes in the map order does not affect output
if len(changedVindexes) > 0 {
Expand Down
124 changes: 87 additions & 37 deletions go/vt/vtgate/engine/update_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,23 +205,29 @@ func TestUpdateEqualChangedVindex(t *testing.T) {
OwnedVindexQuery: "dummy_subquery",
KsidVindex: ks.Vindexes["hash"].(vindexes.SingleColumn),
},
ChangedVindexValues: map[string]VindexValues{
ChangedVindexValues: map[string]*VindexValues{
"twocol": {
"c1": {Value: sqltypes.NewInt64(1)},
"c2": {Value: sqltypes.NewInt64(2)},
PvMap: map[string]sqltypes.PlanValue{
"c1": {Value: sqltypes.NewInt64(1)},
"c2": {Value: sqltypes.NewInt64(2)},
},
Offset: 4,
},
"onecol": {
"c3": {Value: sqltypes.NewInt64(3)},
PvMap: map[string]sqltypes.PlanValue{
"c3": {Value: sqltypes.NewInt64(3)},
},
Offset: 5,
},
},
}

results := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|c1|c2|c3",
"int64|int64|int64|int64",
"id|c1|c2|c3|twocol|onecol",
"int64|int64|int64|int64|int64|int64",
),
"1|4|5|6",
"1|4|5|6|0|0",
)}
vc := newDMLTestVCursor("-20", "20-")
vc.results = results
Expand Down Expand Up @@ -258,14 +264,14 @@ func TestUpdateEqualChangedVindex(t *testing.T) {
`ExecuteMultiShard sharded.-20: dummy_update {} true true`,
})

// Failure case: multiple rows changing.
// multiple rows changing.
results = []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|c1|c2|c3",
"int64|int64|int64|int64",
"id|c1|c2|c3|twocol|onecol",
"int64|int64|int64|int64|int64|int64",
),
"1|4|5|6",
"1|7|8|9",
"1|4|5|6|0|0",
"1|7|8|9|0|0",
)}
vc = newDMLTestVCursor("-20", "20-")
vc.results = results
Expand All @@ -284,8 +290,40 @@ func TestUpdateEqualChangedVindex(t *testing.T) {
// 6 has to be replaced by 3.
`Execute delete from lkp1 where from = :from and toc = :toc from: type:INT64 value:"6" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`,
`Execute insert into lkp1(from, toc) values(:from_0, :toc_0) from_0: type:INT64 value:"3" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`,
// 7,8 have to be replaced by 1,2 (the new values).
`Execute delete from lkp2 where from1 = :from1 and from2 = :from2 and toc = :toc from1: type:INT64 value:"7" from2: type:INT64 value:"8" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`,
`Execute insert into lkp2(from1, from2, toc) values(:from1_0, :from2_0, :toc_0) from1_0: type:INT64 value:"1" from2_0: type:INT64 value:"2" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`,
// 9 has to be replaced by 3.
`Execute delete from lkp1 where from = :from and toc = :toc from: type:INT64 value:"9" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`,
`Execute insert into lkp1(from, toc) values(:from_0, :toc_0) from_0: type:INT64 value:"3" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`,
// Finally, the actual update, which is also sent to -20, same route as the subquery.
`ExecuteMultiShard sharded.-20: dummy_update {} true true`,
})

// multiple rows changing, but only some vindex actually changes
results = []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|c1|c2|c3|twocol|onecol",
"int64|int64|int64|int64|int64|int64",
),
"1|4|5|6|0|1", // twocol changes
"1|7|8|9|1|0", // onecol changes
)}
vc = newDMLTestVCursor("-20", "20-")
vc.results = results

_, err = upd.Execute(vc, map[string]*querypb.BindVariable{}, false)
require.NoError(t, err)
vc.ExpectLog(t, []string{
`ResolveDestinations sharded [] Destinations:DestinationKeyspaceID(166b40b44aba4bd6)`,
// ResolveDestinations is hard-coded to return -20.
// It gets used to perform the subquery to fetch the changing column values.
`ExecuteMultiShard sharded.-20: dummy_subquery {} false false`,
// Those values are returned as 4,5 for twocol and 6 for onecol.
// 4,5 have to be replaced by 1,2 (the new values).
`Execute delete from lkp2 where from1 = :from1 and from2 = :from2 and toc = :toc from1: type:INT64 value:"4" from2: type:INT64 value:"5" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`,
`Execute insert into lkp2(from1, from2, toc) values(:from1_0, :from2_0, :toc_0) from1_0: type:INT64 value:"1" from2_0: type:INT64 value:"2" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`,
// 9 has to be replaced by 3.
`Execute delete from lkp1 where from = :from and toc = :toc from: type:INT64 value:"9" toc: type:VARBINARY value:"\026k@\264J\272K\326" true`,
`Execute insert into lkp1(from, toc) values(:from_0, :toc_0) from_0: type:INT64 value:"3" toc_0: type:VARBINARY value:"\026k@\264J\272K\326" true`,
// Finally, the actual update, which is also sent to -20, same route as the subquery.
Expand All @@ -306,23 +344,29 @@ func TestUpdateScatterChangedVindex(t *testing.T) {
OwnedVindexQuery: "dummy_subquery",
KsidVindex: ks.Vindexes["hash"].(vindexes.SingleColumn),
},
ChangedVindexValues: map[string]VindexValues{
ChangedVindexValues: map[string]*VindexValues{
"twocol": {
"c1": {Value: sqltypes.NewInt64(1)},
"c2": {Value: sqltypes.NewInt64(2)},
PvMap: map[string]sqltypes.PlanValue{
"c1": {Value: sqltypes.NewInt64(1)},
"c2": {Value: sqltypes.NewInt64(2)},
},
Offset: 4,
},
"onecol": {
"c3": {Value: sqltypes.NewInt64(3)},
PvMap: map[string]sqltypes.PlanValue{
"c3": {Value: sqltypes.NewInt64(3)},
},
Offset: 5,
},
},
}

results := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|c1|c2|c3",
"int64|int64|int64|int64",
"id|c1|c2|c3|twocol|onecol",
"int64|int64|int64|int64|int64|int64",
),
"1|4|5|6",
"1|4|5|6|0|0",
)}
vc := newDMLTestVCursor("-20", "20-")
vc.results = results
Expand Down Expand Up @@ -362,11 +406,11 @@ func TestUpdateScatterChangedVindex(t *testing.T) {
// Update can affect multiple rows
results = []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|c1|c2|c3",
"int64|int64|int64|int64",
"id|c1|c2|c3|twocol|onecol",
"int64|int64|int64|int64|int64|int64",
),
"1|4|5|6",
"1|7|8|9",
"1|4|5|6|0|0",
"1|7|8|9|0|0",
)}
vc = newDMLTestVCursor("-20", "20-")
vc.results = results
Expand Down Expand Up @@ -441,24 +485,30 @@ func TestUpdateInChangedVindex(t *testing.T) {
OwnedVindexQuery: "dummy_subquery",
KsidVindex: ks.Vindexes["hash"].(vindexes.SingleColumn),
},
ChangedVindexValues: map[string]VindexValues{
ChangedVindexValues: map[string]*VindexValues{
"twocol": {
"c1": {Value: sqltypes.NewInt64(1)},
"c2": {Value: sqltypes.NewInt64(2)},
PvMap: map[string]sqltypes.PlanValue{
"c1": {Value: sqltypes.NewInt64(1)},
"c2": {Value: sqltypes.NewInt64(2)},
},
Offset: 4,
},
"onecol": {
"c3": {Value: sqltypes.NewInt64(3)},
PvMap: map[string]sqltypes.PlanValue{
"c3": {Value: sqltypes.NewInt64(3)},
},
Offset: 5,
},
},
}

results := []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|c1|c2|c3",
"int64|int64|int64|int64",
"id|c1|c2|c3|twocol|onecol",
"int64|int64|int64|int64|int64|int64",
),
"1|4|5|6",
"2|21|22|23",
"1|4|5|6|0|0",
"2|21|22|23|0|0",
)}
vc := newDMLTestVCursor("-20", "20-")
vc.results = results
Expand Down Expand Up @@ -501,15 +551,15 @@ func TestUpdateInChangedVindex(t *testing.T) {
`ExecuteMultiShard sharded.-20: dummy_update {} true true`,
})

// Failure case: multiple rows changing.
// multiple rows changing.
results = []*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields(
"id|c1|c2|c3",
"int64|int64|int64|int64",
"id|c1|c2|c3|twocol|onecol",
"int64|int64|int64|int64|int64|int64",
),
"1|4|5|6",
"1|7|8|9",
"2|21|22|23",
"1|4|5|6|0|0",
"1|7|8|9|0|0",
"2|21|22|23|0|0",
)}
vc = newDMLTestVCursor("-20", "20-")
vc.results = results
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vtgate/executor_dml_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,16 @@ func TestUpdateEqual(t *testing.T) {
sbc2.Queries = nil
sbclookup.Queries = nil
sbc1.SetResults([]*sqltypes.Result{sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|name|lastname", "int64|int32|varchar"),
"1|1|foo",
sqltypes.MakeTestFields("id|name|lastname|name_lastname_keyspace_id_map", "int64|int32|varchar|int64"),
"1|1|foo|0",
),
})

_, err = executorExec(executor, "update user2 set name='myname', lastname='mylastname' where id = 1", nil)
require.NoError(t, err)
wantQueries = []*querypb.BoundQuery{
{
Sql: "select id, name, lastname from user2 where id = 1 for update",
Sql: "select id, name, lastname, name = 'myname' and lastname = 'mylastname' from user2 where id = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
},
{
Expand Down Expand Up @@ -213,16 +213,16 @@ func TestUpdateMultiOwned(t *testing.T) {

sbc1.SetResults([]*sqltypes.Result{
sqltypes.MakeTestResult(
sqltypes.MakeTestFields("id|a|b|c|d|e|f", "int64|int64|int64|int64|int64|int64|int64"),
"1|10|20|30|40|50|60",
sqltypes.MakeTestFields("id|a|b|c|d|e|f|lookup1|lookup3", "int64|int64|int64|int64|int64|int64|int64|int64|int64"),
"1|10|20|30|40|50|60|0|0",
),
})
_, err := executorExec(executor, "update user set a=1, b=2, f=4, e=3 where id=1", nil)
if err != nil {
t.Fatal(err)
}
wantQueries := []*querypb.BoundQuery{{
Sql: "select id, a, b, c, d, e, f from user where id = 1 for update",
Sql: "select id, a, b, c, d, e, f, a = 1 and b = 2, e = 3 and f = 4 from user where id = 1 for update",
BindVariables: map[string]*querypb.BindVariable{},
}, {
Sql: "update user set a = 1, b = 2, f = 4, e = 3 where id = 1",
Expand Down
2 changes: 1 addition & 1 deletion go/vt/vtgate/executor_select_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -418,7 +418,7 @@ func TestRowCount(t *testing.T) {
require.NoError(t, err)
testRowCount(t, executor, -1)

_, err = executorExec(executor, "update user set name = 'abc' where id in (42, 24)", map[string]*querypb.BindVariable{})
_, err = executorExec(executor, "delete from user where id in (42, 24)", map[string]*querypb.BindVariable{})
require.NoError(t, err)
testRowCount(t, executor, 2)
}
Expand Down
Loading

0 comments on commit d4bd3ba

Please sign in to comment.