Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for query payload limit #6143

Merged
merged 4 commits into from
Jun 22, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 23 additions & 0 deletions go/vt/sqlparser/comments.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ const (
DirectiveQueryTimeout = "QUERY_TIMEOUT_MS"
// DirectiveScatterErrorsAsWarnings enables partial success scatter select queries
DirectiveScatterErrorsAsWarnings = "SCATTER_ERRORS_AS_WARNINGS"
// DirectiveIgnoreMaxPayloadSize skips payload size validation when set.
DirectiveIgnoreMaxPayloadSize = "IGNORE_MAX_PAYLOAD_SIZE"
)

func isNonSpace(r rune) bool {
Expand Down Expand Up @@ -298,3 +300,24 @@ func SkipQueryPlanCacheDirective(stmt Statement) bool {
}
return false
}

// IgnoreMaxPayloadSizeDirective returns true if the max payload size override
// directive is set to true.
func IgnoreMaxPayloadSizeDirective(stmt Statement) bool {
switch stmt := stmt.(type) {
case *Select:
directives := ExtractCommentDirectives(stmt.Comments)
return directives.IsSet(DirectiveIgnoreMaxPayloadSize)
case *Insert:
directives := ExtractCommentDirectives(stmt.Comments)
return directives.IsSet(DirectiveIgnoreMaxPayloadSize)
case *Update:
directives := ExtractCommentDirectives(stmt.Comments)
return directives.IsSet(DirectiveIgnoreMaxPayloadSize)
case *Delete:
directives := ExtractCommentDirectives(stmt.Comments)
return directives.IsSet(DirectiveIgnoreMaxPayloadSize)
default:
return false
}
}
22 changes: 22 additions & 0 deletions go/vt/sqlparser/comments_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,11 @@ limitations under the License.
package sqlparser

import (
"fmt"
"reflect"
"testing"

"github.com/stretchr/testify/assert"
)

func TestSplitComments(t *testing.T) {
Expand Down Expand Up @@ -385,3 +388,22 @@ func TestSkipQueryPlanCacheDirective(t *testing.T) {
t.Errorf("d.SkipQueryPlanCacheDirective(stmt) should be true")
}
}

func TestIgnoreMaxPayloadSizeDirective(t *testing.T) {
testCases := []struct {
query string
expected bool
}{
{"insert /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ into user(id) values (1), (2)", true},
{"insert into user(id) values (1), (2)", false},
{"update /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ users set name=1", true},
{"select /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ * from users", true},
{"delete /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ from users", true},
}

for _, test := range testCases {
stmt, _ := Parse(test.query)
got := IgnoreMaxPayloadSizeDirective(stmt)
assert.Equalf(t, test.expected, got, fmt.Sprintf("d.IgnoreMaxPayloadSizeDirective(stmt) returned %v but expected %v", got, test.expected))
}
}
21 changes: 20 additions & 1 deletion go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1283,10 +1283,14 @@ func (e *Executor) getPlan(vcursor *vcursorImpl, sql string, comments sqlparser.
return nil, err
}

// Normalize if possible and retry.
query := sql
statement := stmt
bindVarNeeds := sqlparser.BindVarNeeds{}
if !sqlparser.IgnoreMaxPayloadSizeDirective(statement) && !isValidPayloadSize(query) {
return nil, vterrors.New(vtrpcpb.Code_RESOURCE_EXHAUSTED, "query payload size above threshold")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be Code_FAILED_PRECONDITION instead?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think ResourceExhausted is fine, but it'd be nice to use a MySQL error code that indicates it's not retryable. In the upstream community Slack, we were discussing using MySQL error code 1153 for things like this and gRPC packet size exceeded going forward. I'm not sure where that MySQL error code is set and where the gRPC error code is set.

}

// Normalize if possible and retry.
if (e.normalize && sqlparser.CanNormalize(stmt)) || sqlparser.IsSetStatement(stmt) {
parameterize := e.normalize // the public flag is called normalize
result, err := sqlparser.PrepareAST(stmt, bindVars, "vtg", parameterize)
Expand Down Expand Up @@ -1495,6 +1499,21 @@ func checkLikeOpt(likeOpt string, colNames []string) (string, error) {
return "", nil
}

// isValidPayloadSize validates whether a query payload is above the
// configured MaxPayloadSize threshold. The WarnPayloadSizeExceeded will increment
// if the payload size exceeds the warnPayloadSize.

func isValidPayloadSize(query string) bool {
payloadSize := len(query)
if *maxPayloadSize > 0 && payloadSize > *maxPayloadSize {
return false
}
if *warnPayloadSize > 0 && payloadSize > *warnPayloadSize {
warnings.Add("WarnPayloadSizeExceeded", 1)
}
return true
}

// Prepare executes a prepare statements.
func (e *Executor) Prepare(ctx context.Context, method string, safeSession *SafeSession, sql string, bindVars map[string]*querypb.BindVariable) (fld []*querypb.Field, err error) {
logStats := NewLogStats(ctx, method, sql, bindVars)
Expand Down
48 changes: 48 additions & 0 deletions go/vt/vtgate/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1869,6 +1869,54 @@ func TestGenerateCharsetRows(t *testing.T) {
}
}

func TestExecutorMaxPayloadSizeExceeded(t *testing.T) {
saveMax := *maxPayloadSize
saveWarn := *warnPayloadSize
*maxPayloadSize = 10
*warnPayloadSize = 5
defer func() {
*maxPayloadSize = saveMax
*warnPayloadSize = saveWarn
}()

executor, _, _, _ := createExecutorEnv()
session := NewSafeSession(&vtgatepb.Session{TargetString: "@master"})
warningCount := warnings.Counts()["WarnPayloadSizeExceeded"]
testMaxPayloadSizeExceeded := []string{
"select * from main1",
"select * from main1",
"insert into main1(id) values (1), (2)",
"update main1 set id=1",
"delete from main1 where id=1",
}
for _, query := range testMaxPayloadSizeExceeded {
_, err := executor.Execute(context.Background(), "TestExecutorMaxPayloadSizeExceeded", session, query, nil)
if err == nil {
assert.EqualError(t, err, "query payload size above threshold")
}
}
assert.Equal(t, warningCount, warnings.Counts()["WarnPayloadSizeExceeded"], "warnings count")

testMaxPayloadSizeOverride := []string{
"select /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ * from main1",
"insert /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ into main1(id) values (1), (2)",
"update /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ main1 set id=1",
"delete /*vt+ IGNORE_MAX_PAYLOAD_SIZE=1 */ from main1 where id=1",
}
for _, query := range testMaxPayloadSizeOverride {
_, err := executor.Execute(context.Background(), "TestExecutorMaxPayloadSizeWithOverride", session, query, nil)
assert.Equal(t, nil, err, "err should be nil")
}
assert.Equal(t, warningCount, warnings.Counts()["WarnPayloadSizeExceeded"], "warnings count")

*maxPayloadSize = 1000
for _, query := range testMaxPayloadSizeExceeded {
_, err := executor.Execute(context.Background(), "TestExecutorMaxPayloadSizeExceeded", session, query, nil)
assert.Equal(t, nil, err, "err should be nil")
}
assert.Equal(t, warningCount+4, warnings.Counts()["WarnPayloadSizeExceeded"], "warnings count")
}

func TestOlapSelectDatabase(t *testing.T) {
executor, _, _, _ := createExecutorEnv()
executor.normalize = true
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtgate/vtgate.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ var (
HealthCheckRetryDelay = flag.Duration("healthcheck_retry_delay", 2*time.Millisecond, "health check retry delay")
// HealthCheckTimeout is the timeout on the RPC call to tablets
HealthCheckTimeout = flag.Duration("healthcheck_timeout", time.Minute, "the health check timeout period")
maxPayloadSize = flag.Int("max_payload_size", 0, "The threshold for query payloads in bytes. A payload greater than this threshold will result in a failure to handle the query.")
warnPayloadSize = flag.Int("warn_payload_size", 0, "The warning threshold for query payloads in bytes. A payload greater than this threshold will cause the VtGateWarnings.WarnPayloadSizeExceeded counter to be incremented.")
)

func getTxMode() vtgatepb.TransactionMode {
Expand Down Expand Up @@ -194,7 +196,7 @@ func Init(ctx context.Context, serv srvtopo.Server, cell string, tabletTypesToWa
_ = stats.NewRates("ErrorsByDbType", stats.CounterForDimension(errorCounts, "DbType"), 15, 1*time.Minute)
_ = stats.NewRates("ErrorsByCode", stats.CounterForDimension(errorCounts, "Code"), 15, 1*time.Minute)

warnings = stats.NewCountersWithSingleLabel("VtGateWarnings", "Vtgate warnings", "type", "IgnoredSet", "ResultsExceeded")
warnings = stats.NewCountersWithSingleLabel("VtGateWarnings", "Vtgate warnings", "type", "IgnoredSet", "ResultsExceeded", "WarnPayloadSizeExceeded")

servenv.OnRun(func() {
for _, f := range RegisterVTGates {
Expand Down