Skip to content

Commit

Permalink
server: pod-to-pod fanout for statements api on tenant
Browse files Browse the repository at this point in the history
Previously, the Statements endpoint implementation was purely local in
its execution and reported only in-memory data on the current tenant.
This change initializes a gRPC and gRPC-gateway server on the tenant,
much like we do on the hosts already, and uses the newly added
InstanceID subsystem for tenants to implement a gRPC-based fanout for
the Statements endpoint implementation.

The fanout is done much in the same manner as on hosts and we
expose the status server endpoints via HTTP on the tenant as well.

This change is necessary to support serverless observability features
and provide our UIs access to the Statements endpoint. Future work may
move this API to SQL-only

Resolves cockroachdb#64477

Release note (api change): tenant pods now expose the Statements API at
`/_status/statements` on their HTTP port.

REVIEWER NOTE: This change is based on cockroachdb#66600 which is still in
progress, please only review the final commit. Once cockroachdb#66600 is merged,
only the final commit will remain on rebase.
  • Loading branch information
dhartunian committed Jul 22, 2021
1 parent 330382b commit 11febcb
Show file tree
Hide file tree
Showing 10 changed files with 603 additions and 116 deletions.
111 changes: 111 additions & 0 deletions pkg/ccl/serverccl/tenant_grpc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
// Copyright 2021 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package serverccl

import (
"context"
"io/ioutil"
"net/http"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/rpc"
"github.com/cockroachdb/cockroach/pkg/server/serverpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/httputil"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/stretchr/testify/require"
)

// TestTenantGRPCServices tests that the gRPC servers that are externally
// facing have been started up on the tenant server. This includes gRPC that is
// used for pod-to-pod communication as well as the HTTP services powered by
// gRPC Gateway that are used to serve endpoints to power observability UIs.
func TestTenantGRPCServices(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

testCluster := serverutils.StartNewTestCluster(t, 3, base.TestClusterArgs{})
defer testCluster.Stopper().Stop(ctx)

server := testCluster.Server(0)

tenantID := roachpb.MakeTenantID(10)
tenant, connTenant := serverutils.StartTenant(t, server, base.TestTenantArgs{
TenantID: tenantID,
})
defer connTenant.Close()

t.Run("gRPC is running", func(t *testing.T) {
grpcAddr := tenant.SQLAddr()
rpcCtx := tenant.RPCContext()

conn, err := rpcCtx.GRPCDialNode(grpcAddr, roachpb.NodeID(tenant.SQLInstanceID()), rpc.DefaultClass).Connect(ctx)
require.NoError(t, err)
defer conn.Close()

client := serverpb.NewStatusClient(conn)

resp, err := client.Statements(ctx, &serverpb.StatementsRequest{NodeID: "local"})
require.NoError(t, err)
require.NotEmpty(t, resp.Statements)
})

t.Run("gRPC Gateway is running", func(t *testing.T) {
resp, err := httputil.Get(ctx, "http://"+tenant.HTTPAddr()+"/_status/statements")
defer http.DefaultClient.CloseIdleConnections()
require.NoError(t, err)
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.Contains(t, string(body), "transactions")
})

sqlRunner := sqlutils.MakeSQLRunner(connTenant)
sqlRunner.Exec(t, "CREATE TABLE test (id int)")
sqlRunner.Exec(t, "INSERT INTO test VALUES (1)")

tenant2, connTenant2 := serverutils.StartTenant(t, server, base.TestTenantArgs{
TenantID: tenantID,
Existing: true,
})
defer connTenant2.Close()

t.Run("statements endpoint fans out request to multiple pods", func(t *testing.T) {
resp, err := httputil.Get(ctx, "http://"+tenant2.HTTPAddr()+"/_status/statements")
defer http.DefaultClient.CloseIdleConnections()
require.NoError(t, err)
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.Contains(t, string(body), "CREATE TABLE test")
require.Contains(t, string(body), "INSERT INTO test VALUES")
})

tenant3, connTenant3 := serverutils.StartTenant(t, server, base.TestTenantArgs{
TenantID: roachpb.MakeTenantID(11),
})
defer connTenant3.Close()

t.Run("fanout of statements endpoint is segregated by tenant", func(t *testing.T) {
resp, err := httputil.Get(ctx, "http://"+tenant3.HTTPAddr()+"/_status/statements")
defer http.DefaultClient.CloseIdleConnections()
require.NoError(t, err)
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
require.NoError(t, err)
require.NotContains(t, string(body), "CREATE TABLE test")
require.NotContains(t, string(body), "INSERT INTO test VALUES")
})
}
4 changes: 4 additions & 0 deletions pkg/rpc/auth_tenant.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ func (a tenantAuthorizer) authorize(
case "/cockroach.rpc.Heartbeat/Ping":
return nil // no authorization

case "/cockroach.server.serverpb.Status/Statements":
// The Statements endpoint requires only SQL
return nil // no authorization

default:
return authErrorf("unknown method %q", fullMethod)
}
Expand Down
13 changes: 13 additions & 0 deletions pkg/rpc/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -1018,6 +1018,19 @@ func (ctx *Context) GRPCDialNode(
return ctx.grpcDialNodeInternal(target, remoteNodeID, class)
}

// GRPCDialPod wraps GRPCDialNode and treats the `remoteInstanceID`
// argument as a `NodeID` which it converts. This works because the
// tenant gRPC server is initialized using the `InstanceID` so it
// accepts our connection as matching the ID we're dialing.
//
// Since GRPCDialNode accepts a separate `target` and `NodeID` it
// requires no further modification to work between pods.
func (ctx *Context) GRPCDialPod(
target string, remoteInstanceID base.SQLInstanceID, class ConnectionClass,
) *Connection {
return ctx.GRPCDialNode(target, roachpb.NodeID(remoteInstanceID), class)
}

func (ctx *Context) grpcDialNodeInternal(
target string, remoteNodeID roachpb.NodeID, class ConnectionClass,
) *Connection {
Expand Down
Loading

0 comments on commit 11febcb

Please sign in to comment.