Skip to content

Commit

Permalink
Merge pull request cockroachdb#33261 from knz/backport2.1-33138
Browse files Browse the repository at this point in the history
release-2.1: sql: ensure that internal executors have a valid application_name
  • Loading branch information
knz authored Dec 28, 2018
2 parents 69f15bb + 175e5e0 commit 3c80c5e
Show file tree
Hide file tree
Showing 3 changed files with 176 additions and 9 deletions.
12 changes: 7 additions & 5 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,10 @@ func (s *Server) newConnExecutor(
defaults: sessionDefaults,
settings: s.cfg.Settings,
curTxnReadOnly: &ex.state.readOnly,
// applicationNameChanged is used when setting app name in client
// sessions or when using the session defaults map. When
// populating session data for internal executors, we use a
// different logic, see below.
applicationNameChanged: func(newName string) {
ex.appStats = ex.server.sqlStats.getStatsForApplication(newName)
ex.applicationName.Store(newName)
Expand All @@ -503,11 +507,9 @@ func (s *Server) newConnExecutor(
return nil, err
}
} else {
// It's possible there were no defaults, for example when the
// connEx is serving an internal executor. In that case we still
// need to populate appStats according to the configured
// application name.
ex.appStats = s.sqlStats.getStatsForApplication(ex.sessionData.ApplicationName)
// We have set the ex.sessionData without using the dataMutator.
// So we need to update the application name manually.
ex.dataMutator.applicationNameChanged(ex.sessionData.ApplicationName)
}

ex.phaseTimes[sessionInit] = timeutil.Now()
Expand Down
163 changes: 163 additions & 0 deletions pkg/sql/internal_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,20 @@ package sql_test

import (
"context"
"fmt"
"strings"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/internal/client"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlbase"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
)
Expand Down Expand Up @@ -134,6 +140,163 @@ func TestSessionBoundInternalExecutor(t *testing.T) {
}
}

// TestInternalExecAppNameInitialization validates that the application name
// is properly initialized for both kinds of internal executors: the "standalone"
// internal executor and those that hang off client sessions ("session-bound").
// In both cases it does so by checking the result of SHOW application_name,
// the cancellability of the query, and the listing in the application statistics.
func TestInternalExecAppNameInitialization(t *testing.T) {
defer leaktest.AfterTest(t)()

params, _ := tests.CreateTestServerParams()
params.Insecure = true

// sem will be fired every time pg_sleep(1337666) is called.
sem := make(chan struct{})
params.Knobs.SQLExecutor = &sql.ExecutorTestingKnobs{
BeforeExecute: func(ctx context.Context, stmt string, _ /* isParallel*/ bool) {
if strings.Contains(stmt, "(1.337666") {
sem <- struct{}{}
}
},
}
s, _, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())

t.Run("root internal exec", func(t *testing.T) {
testInternalExecutorAppNameInitialization(t, sem,
sql.InternalAppNamePrefix+"internal-test-query",
s.InternalExecutor().(*sql.InternalExecutor))
})

ie := sql.MakeSessionBoundInternalExecutor(
context.TODO(),
&sessiondata.SessionData{
User: security.RootUser,
Database: "defaultdb",
ApplicationName: "appname_findme",
SequenceState: &sessiondata.SequenceState{},
},
s.(*server.TestServer).Server.PGServer().SQLServer,
sql.MemoryMetrics{},
s.ExecutorConfig().(sql.ExecutorConfig).Settings,
)
t.Run("session bound exec", func(t *testing.T) {
testInternalExecutorAppNameInitialization(t, sem,
"appname_findme",
&ie)
})
}

type testInternalExecutor interface {
Query(
ctx context.Context, opName string, txn *client.Txn, stmt string, qargs ...interface{},
) ([]tree.Datums, sqlbase.ResultColumns, error)
Exec(
ctx context.Context, opName string, txn *client.Txn, stmt string, qargs ...interface{},
) (int, error)
}

func testInternalExecutorAppNameInitialization(
t *testing.T, sem chan struct{}, expectedAppName string, ie testInternalExecutor,
) {
// Check that the application_name is set properly in the executor.
if rows, _, err := ie.Query(context.TODO(), "test-query", nil,
"SHOW application_name"); err != nil {
t.Fatal(err)
} else if len(rows) != 1 {
t.Fatalf("expected 1 row, got: %+v", rows)
} else if appName := string(*rows[0][0].(*tree.DString)); appName != expectedAppName {
t.Fatalf("unexpected app name: expected %q, got %q", expectedAppName, appName)
}

// Start a background query using the internal executor. We want to
// have this keep running until we cancel it below.
errChan := make(chan error)
go func() {
_, _, err := ie.Query(context.TODO(),
"test-query",
nil, /* txn */
"SELECT pg_sleep(1337666)")
if err != nil {
errChan <- err
return
}
}()

<-sem

// We'll wait until the query appears in SHOW QUERIES.
// When it does, we capture the query ID.
var queryID string
testutils.SucceedsSoon(t, func() error {
rows, _, err := ie.Query(context.TODO(),
"find-query",
nil, /* txn */
// We need to assemble the magic string so that this SELECT
// does not find itself.
"SELECT query_id, application_name FROM [SHOW QUERIES] WHERE query LIKE '%337' || '666%'")
if err != nil {
return err
}
switch len(rows) {
case 0:
// The SucceedsSoon test may find this a couple of times before
// this succeeds.
return fmt.Errorf("query not started yet")
case 1:
appName := string(*rows[0][1].(*tree.DString))
if appName != expectedAppName {
return fmt.Errorf("unexpected app name: expected %q, got %q", expectedAppName, appName)
}

// Good app name, retrieve query ID for later cancellation.
queryID = string(*rows[0][0].(*tree.DString))
return nil
default:
return fmt.Errorf("unexpected results: %+v", rows)
}
})

// Check that the query shows up in the internal tables without error.
if rows, _, err := ie.Query(context.TODO(), "find-query", nil,
"SELECT application_name FROM crdb_internal.node_queries WHERE query LIKE '%337' || '666%'"); err != nil {
t.Fatal(err)
} else if len(rows) != 1 {
t.Fatalf("expected 1 query, got: %+v", rows)
} else if appName := string(*rows[0][0].(*tree.DString)); appName != expectedAppName {
t.Fatalf("unexpected app name: expected %q, got %q", expectedAppName, appName)
}

// We'll want to look at statistics below, and finish the test with
// no goroutine leakage. To achieve this, cancel the query. and
// drain the goroutine.
if _, err := ie.Exec(context.TODO(), "cancel-query", nil, "CANCEL QUERY $1", queryID); err != nil {
t.Fatal(err)
}
select {
case err := <-errChan:
if !isClientsideQueryCanceledErr(err) {
t.Fatal(err)
}
case <-time.After(time.Second * 5):
t.Fatal("no error received from query supposed to be canceled")
}

// TODO(knz): remove this skip when we log internal queries in stats.
t.Skip("#32215")

// Now check that it was properly registered in statistics.
if rows, _, err := ie.Query(context.TODO(), "find-query", nil,
"SELECT application_name FROM crdb_internal.node_statement_statistics WHERE key LIKE 'SELECT' || ' pg_sleep('"); err != nil {
t.Fatal(err)
} else if len(rows) != 1 {
t.Fatalf("expected 1 query, got: %+v", rows)
} else if appName := string(*rows[0][0].(*tree.DString)); appName != expectedAppName {
t.Fatalf("unexpected app name: expected %q, got %q", expectedAppName, appName)
}
}

// TODO(andrei): Test that descriptor leases are released by the
// InternalExecutor, with and without a higher-level txn. When there is no
// higher-level txn, the leases are released normally by the txn finishing. When
Expand Down
10 changes: 6 additions & 4 deletions pkg/sql/run_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -450,9 +450,11 @@ func TestCancelIfExists(t *testing.T) {
}

func isClientsideQueryCanceledErr(err error) bool {
pqErr, ok := err.(*pq.Error)
if !ok {
return false
if pgErr, ok := pgerror.GetPGCause(err); ok {
return pgErr.Code == pgerror.CodeQueryCanceledError
}
return pqErr.Code == pgerror.CodeQueryCanceledError
if pqErr, ok := err.(*pq.Error); ok {
return pqErr.Code == pgerror.CodeQueryCanceledError
}
return false
}

0 comments on commit 3c80c5e

Please sign in to comment.