Skip to content

Commit

Permalink
ccl/sqlproxyccl: parse private endpoint IDs in the PROXY header
Browse files Browse the repository at this point in the history
This commit parses the private endpoint IDs within the PROXY header. Doing
this would also mean that the ACL for private endpoints will be enforced under
the right conditions. The parsing logic assumes that there can only be one
type of endpoint ID within the PROXY header (i.e. AWS, GCP, or Azure).

Release note: None
  • Loading branch information
jaylim-crl committed May 15, 2023
1 parent d1fc05e commit 42caad4
Show file tree
Hide file tree
Showing 6 changed files with 275 additions and 5 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/sqlproxyccl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ go_test(
"@com_github_jackc_pgproto3_v2//:pgproto3",
"@com_github_jackc_pgx_v4//:pgx",
"@com_github_pires_go_proxyproto//:go-proxyproto",
"@com_github_pires_go_proxyproto//tlvparse",
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
"@in_gopkg_yaml_v3//:yaml_v3",
Expand Down
4 changes: 4 additions & 0 deletions pkg/ccl/sqlproxyccl/acl/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ go_library(
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_google_btree//:btree",
"@com_github_pires_go_proxyproto//:go-proxyproto",
"@com_github_pires_go_proxyproto//tlvparse",
"@in_gopkg_yaml_v2//:yaml_v2",
],
)
Expand All @@ -47,6 +49,8 @@ go_test(
"//pkg/util/timeutil",
"@com_github_cockroachdb_errors//:errors",
"@com_github_google_btree//:btree",
"@com_github_pires_go_proxyproto//:go-proxyproto",
"@com_github_pires_go_proxyproto//tlvparse",
"@com_github_stretchr_testify//require",
"@in_gopkg_yaml_v2//:yaml_v2",
],
Expand Down
37 changes: 37 additions & 0 deletions pkg/ccl/sqlproxyccl/acl/private_endpoints.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,14 @@ package acl

import (
"context"
"net"
"strconv"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/errors"
"github.com/pires/go-proxyproto"
"github.com/pires/go-proxyproto/tlvparse"
)

type lookupTenantFunc func(ctx context.Context, tenantID roachpb.TenantID) (*tenant.Tenant, error)
Expand Down Expand Up @@ -56,3 +60,36 @@ func (p *PrivateEndpoints) CheckConnection(ctx context.Context, conn ConnectionT
conn.TenantID.String(),
)
}

// FindPrivateEndpointID looks for the endpoint identifier within the connection
// object (which must be a *proxyproto.Conn) and returns that. If no endpoint
// IDs are found, an empty string will be returned.
func FindPrivateEndpointID(conn net.Conn) (string, error) {
proxyConn, ok := conn.(*proxyproto.Conn)
if !ok {
// This should not happen.
return "", errors.New("connection isn't a proxyproto.Conn")
}
header := proxyConn.ProxyHeader()
// Not a private connection.
if header == nil {
return "", nil
}
tlvs, err := header.TLVs()
if err != nil {
return "", err
}
// AWS.
if eid := tlvparse.FindAWSVPCEndpointID(tlvs); eid != "" {
return eid, nil
}
// Azure.
if eid, found := tlvparse.FindAzurePrivateEndpointLinkID(tlvs); found {
return strconv.FormatUint(uint64(eid), 10), nil
}
// GCP.
if eid, found := tlvparse.ExtractPSCConnectionID(tlvs); found {
return strconv.FormatUint(eid, 10), nil
}
return "", nil
}
81 changes: 81 additions & 0 deletions pkg/ccl/sqlproxyccl/acl/private_endpoints_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,21 @@ package acl_test

import (
"context"
"net"
"testing"

"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/acl"
"github.com/cockroachdb/cockroach/pkg/ccl/sqlproxyccl/tenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/errors"
"github.com/pires/go-proxyproto"
"github.com/pires/go-proxyproto/tlvparse"
"github.com/stretchr/testify/require"
)

func TestPrivateEndpoints(t *testing.T) {
defer leaktest.AfterTest(t)()
ctx := context.Background()

makeConn := func(endpoint string) acl.ConnectionTags {
Expand Down Expand Up @@ -94,3 +99,79 @@ func TestPrivateEndpoints(t *testing.T) {
require.EqualError(t, err, "connection to '42' denied: cluster does not allow this private connection")
})
}

func TestFindPrivateEndpointID(t *testing.T) {
defer leaktest.AfterTest(t)()

newHeader := func(t *testing.T, tlvs []proxyproto.TLV) *proxyproto.Header {
t.Helper()
h := &proxyproto.Header{
Version: 2,
Command: proxyproto.PROXY,
TransportProtocol: proxyproto.TCPv4,
SourceAddr: &net.TCPAddr{
IP: net.ParseIP("10.1.1.1"),
Port: 1000,
},
DestinationAddr: &net.TCPAddr{
IP: net.ParseIP("20.2.2.2"),
Port: 2000,
},
}
require.NoError(t, h.SetTLVs(tlvs))
return h
}
awsTLV := proxyproto.TLV{
Type: tlvparse.PP2_TYPE_AWS,
Value: []byte{0x01, 0x76, 0x70, 0x63, 0x65, 0x2d, 0x61, 0x62, 0x63, 0x31, 0x32, 0x33},
}
azureTLV := proxyproto.TLV{
Type: tlvparse.PP2_TYPE_AZURE,
Value: []byte{0x1, 0xc1, 0x45, 0x0, 0x21},
}
gcpTLV := proxyproto.TLV{
Type: tlvparse.PP2_TYPE_GCP,
Value: []byte{'\xff', '\xff', '\xff', '\xff', '\xc0', '\xa8', '\x64', '\x02'},
}
miscTLV := proxyproto.TLV{
Type: proxyproto.PP2_TYPE_AUTHORITY,
Value: []byte("cockroachlabs.com"),
}

t.Run("not a proxyproto.Conn", func(t *testing.T) {
p1, _ := net.Pipe()
defer p1.Close()

eid, err := acl.FindPrivateEndpointID(p1)
require.EqualError(t, err, "connection isn't a proxyproto.Conn")
require.Empty(t, eid)
})

for _, tc := range []struct {
name string
tlvs []proxyproto.TLV
expected string
}{
{"no values", []proxyproto.TLV{}, ""},
{"unrelated values", []proxyproto.TLV{miscTLV}, ""},
{"aws values", []proxyproto.TLV{awsTLV}, "vpce-abc123"},
{"azure values", []proxyproto.TLV{azureTLV}, "553665985"},
{"gcp values", []proxyproto.TLV{gcpTLV}, "18446744072646845442"},
{"multiple values", []proxyproto.TLV{gcpTLV, awsTLV, azureTLV}, "vpce-abc123"},
} {
t.Run(tc.name, func(t *testing.T) {
p1, p2 := net.Pipe()
defer p1.Close()
defer p2.Close()

go func(tlvs []proxyproto.TLV) {
_, err := newHeader(t, tlvs).WriteTo(p2)
require.NoError(t, err)
}(tc.tlvs)
conn := proxyproto.NewConn(p1)
eid, err := acl.FindPrivateEndpointID(conn)
require.NoError(t, err)
require.Equal(t, tc.expected, eid)
})
}
}
20 changes: 15 additions & 5 deletions pkg/ccl/sqlproxyccl/proxy_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,18 @@ func newProxyHandler(
func (handler *proxyHandler) handle(ctx context.Context, incomingConn net.Conn) error {
connReceivedTime := timeutil.Now()

// Parse headers before admitting the connection since the connection may
// be upgraded to TLS.
var endpointID string
if handler.RequireProxyProtocol {
var err error
endpointID, err = acl.FindPrivateEndpointID(incomingConn)
if err != nil {
updateMetricsAndSendErrToClient(err, incomingConn, handler.metrics)
return err
}
}

fe := FrontendAdmit(incomingConn, handler.incomingTLSConfig())
defer func() { _ = fe.Conn.Close() }()
if fe.Err != nil {
Expand Down Expand Up @@ -380,11 +392,9 @@ func (handler *proxyHandler) handle(ctx context.Context, incomingConn net.Conn)
removeListener, err := handler.aclWatcher.ListenForDenied(
ctx,
acl.ConnectionTags{
IP: ipAddr,
TenantID: tenID,
// TODO(jaylim-crl): Parse PROXY headers and include endpoint ID,
// if there is one. If PROXY protocol isn't used, we shouldn't parse.
EndpointID: "",
IP: ipAddr,
TenantID: tenID,
EndpointID: endpointID,
},
func(err error) {
err = withCode(
Expand Down
137 changes: 137 additions & 0 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
pgproto3 "github.com/jackc/pgproto3/v2"
pgx "github.com/jackc/pgx/v4"
proxyproto "github.com/pires/go-proxyproto"
"github.com/pires/go-proxyproto/tlvparse"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v3"
Expand Down Expand Up @@ -206,6 +207,142 @@ func TestProxyProtocol(t *testing.T) {
})
}

func TestPrivateEndpointsACL(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
te := newTester()
defer te.Close()

sql, db, _ := serverutils.StartServer(t, base.TestServerArgs{
Insecure: false,
// Need to disable the test tenant here because it appears as though
// we're not able to establish the necessary connections from within
// it. More investigation required (tracked with #76378).
DefaultTestTenant: base.TestTenantDisabled,
})
sql.(*server.TestServer).PGPreServer().TestingSetTrustClientProvidedRemoteAddr(true)
defer sql.Stopper().Stop(ctx)

// Create a default user.
sqlDB := sqlutils.MakeSQLRunner(db)
sqlDB.Exec(t, `CREATE USER bob WITH PASSWORD 'builder'`)

// Create the directory server.
tds := tenantdirsvr.NewTestStaticDirectoryServer(sql.Stopper(), nil /* timeSource */)
tenant10 := roachpb.MustMakeTenantID(10)
tenant20 := roachpb.MustMakeTenantID(20)
tenant30 := roachpb.MustMakeTenantID(30)
tds.CreateTenant(tenant10, &tenant.Tenant{
TenantID: tenant10.ToUint64(),
ClusterName: "my-tenant",
ConnectivityType: tenant.ALLOW_PUBLIC | tenant.ALLOW_PRIVATE,
PrivateEndpoints: []string{"vpce-abc123"},
})
tds.CreateTenant(tenant20, &tenant.Tenant{
TenantID: tenant20.ToUint64(),
ClusterName: "other-tenant",
ConnectivityType: tenant.ALLOW_PUBLIC | tenant.ALLOW_PRIVATE,
PrivateEndpoints: []string{"vpce-some-other-vpc"},
})
tds.CreateTenant(tenant30, &tenant.Tenant{
TenantID: tenant30.ToUint64(),
ClusterName: "public-tenant",
ConnectivityType: tenant.ALLOW_PUBLIC,
PrivateEndpoints: []string{},
})
// All tenants map to the same pod.
for _, tenID := range []roachpb.TenantID{tenant10, tenant20, tenant30} {
tds.AddPod(tenID, &tenant.Pod{
TenantID: tenID.ToUint64(),
Addr: sql.ServingSQLAddr(),
State: tenant.RUNNING,
StateTimestamp: timeutil.Now(),
})
}
require.NoError(t, tds.Start(ctx))

options := &ProxyOptions{
SkipVerify: true,
RequireProxyProtocol: true,
}
options.testingKnobs.directoryServer = tds
s, sqlAddr, _ := newSecureProxyServer(ctx, t, sql.Stopper(), options)

timeout := 3 * time.Second
proxyDialer := func(ctx context.Context, network, addr string) (net.Conn, error) {
conn, err := (&net.Dialer{Timeout: timeout}).Dial(network, addr)
if err != nil {
return nil, err
}
header := &proxyproto.Header{
Version: 2,
Command: proxyproto.PROXY,
TransportProtocol: proxyproto.TCPv4,
SourceAddr: &net.TCPAddr{
// Use a dummy address so we can check on that.
IP: net.ParseIP("10.20.30.40"),
Port: 4242,
},
DestinationAddr: conn.RemoteAddr(),
}
if err := header.SetTLVs([]proxyproto.TLV{{
Type: tlvparse.PP2_TYPE_AWS,
// Points to "vpce-abc123" as endpoint ID.
Value: []byte{0x01, 0x76, 0x70, 0x63, 0x65, 0x2d, 0x61, 0x62, 0x63, 0x31, 0x32, 0x33},
}}); err != nil {
return nil, err
}
if err := conn.SetWriteDeadline(timeutil.Now().Add(timeout)); err != nil {
return nil, err
}
_, err = header.WriteTo(conn)
if err != nil {
return nil, err
}
return conn, nil
}

t.Run("private connection allowed", func(t *testing.T) {
url := fmt.Sprintf("postgres://bob:builder@%s/my-tenant-10.defaultdb?sslmode=require", sqlAddr)
te.TestConnectWithPGConfig(
ctx, t, url,
func(c *pgx.ConnConfig) {
c.DialFunc = proxyDialer
},
func(conn *pgx.Conn) {
require.Equal(t, int64(1), s.metrics.CurConnCount.Value())
require.NoError(t, runTestQuery(ctx, conn))
},
)
})

t.Run("private connection disallowed on another tenant", func(t *testing.T) {
url := fmt.Sprintf("postgres://bob:builder@%s/other-tenant-20.defaultdb?sslmode=require", sqlAddr)
_ = te.TestConnectErrWithPGConfig(
ctx, t, url,
func(c *pgx.ConnConfig) {
c.DialFunc = proxyDialer
},
codeProxyRefusedConnection,
"connection refused",
)
})

t.Run("private connection disallowed on public tenant", func(t *testing.T) {
url := fmt.Sprintf("postgres://bob:builder@%s/public-tenant-30.defaultdb?sslmode=require", sqlAddr)
_ = te.TestConnectErrWithPGConfig(
ctx, t, url,
func(c *pgx.ConnConfig) {
c.DialFunc = proxyDialer
},
codeProxyRefusedConnection,
"connection refused",
)
})
}

func TestLongDBName(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down

0 comments on commit 42caad4

Please sign in to comment.