Skip to content

Commit

Permalink
ccl/sqlproxyccl: add an interval poller for private endpoint updates
Browse files Browse the repository at this point in the history
There is a requirement of automatically disconnecting existing connections if
the ACL rules no longer match the connection's state. For the file-based ACLs,
we poll every 1 minute, read the file, and check on every single connection.

Here, we implement a similar mechanism for private endpoint updates. This is
reasonable for now as:
  1. Calls to LookupTenant are cached most of the time.
  2. This is only applied to existing connections, and a polling interval of
     1 minute isn't too bad.
  3. We are already iterating all the connections for the other types ACLs

That said, this approach isn't efficient and we're limited by the existing
design of AccessController. Checking all the connections each time the watch
ticks isn't ideal. In fact, we're checking three times here, one for each ACL.
The directory cache already knows which tenants were updated through
WatchTenants, and we can definitely do better here. A follow up TODO has been
added to refactor AccessController in a way that allows updates to be batched.
We should also only check connections only for tenants that have been updated.

Release note: None
  • Loading branch information
jaylim-crl committed May 15, 2023
1 parent 42caad4 commit 5913539
Show file tree
Hide file tree
Showing 3 changed files with 104 additions and 10 deletions.
6 changes: 1 addition & 5 deletions pkg/ccl/sqlproxyccl/acl/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,6 @@ import (
"gopkg.in/yaml.v2"
)

const (
defaultPollingInterval = time.Minute
)

type FromFile interface {
AccessController
yaml.Unmarshaler
Expand Down Expand Up @@ -87,7 +83,7 @@ func readFile[T FromFile](ctx context.Context, filename string, postRead func(c
return f, nil
}

// WatchForUpdates periodically reloads the access control list file. The daemon is
// watchForUpdate periodically reloads the access control list file. The daemon is
// canceled on ctx cancellation.
func watchForUpdate[T FromFile](
ctx context.Context,
Expand Down
68 changes: 63 additions & 5 deletions pkg/ccl/sqlproxyccl/acl/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
Expand Down Expand Up @@ -117,6 +118,10 @@ func WithLookupTenantFn(fn lookupTenantFunc) Option {
}
}

const (
defaultPollingInterval = time.Minute
)

func NewWatcher(ctx context.Context, opts ...Option) (*Watcher, error) {
options := &aclOptions{
pollingInterval: defaultPollingInterval,
Expand Down Expand Up @@ -162,12 +167,32 @@ func NewWatcher(ctx context.Context, opts ...Option) (*Watcher, error) {
w.addAccessController(ctx, c, next)
}
if w.options.lookupTenantFn != nil {
// TODO(jaylim-crl): Add a watcher or some sort of mechanism to react
// to metadata updates. newAccessControllerFromFile uses a file-based
// watcher. For now, this only checks on startup.
w.addAccessController(ctx, &PrivateEndpoints{
controller := &PrivateEndpoints{
LookupTenantFn: w.options.lookupTenantFn,
}, nil)
}
// We use a normal polling interval to determine when we should check
// the connections for private endpoints update. This is reasonable
// for now as:
// 1. Calls to LookupTenant are cached most of the time.
// 2. This is only applied to existing connections, and a polling
// interval of 1 minute isn't too bad.
// 3. We are already iterating all the connections for the other types
// of ACLs.
//
// TODO(jaylim-crl): The directory cache already knows which tenants
// are updated through WatchTenants. We can do better here. Refactor
// AccessController in a way that allows those tenant metadata updates
// to be batched, and check connections for those tenants only. At the
// same time, the current AccessController design is poor because we
// iterate through all the connections for each ACL update (i.e. 1 for
// allowlist, 1 for denylist, and another for private endpoints).
next := pollAndUpdateChan(
ctx,
w.options.timeSource,
w.options.pollingInterval,
controller,
)
w.addAccessController(ctx, controller, next)
}
return w, nil
}
Expand Down Expand Up @@ -289,3 +314,36 @@ func checkConnection(
}
return nil
}

// pollAndUpdateChan sends the same access controller object into the returned
// channel every pollingInterval. This is used by ACL types that require calls
// to the directory cache (and don't store local state).
//
// See caller of pollAndUpdateChan for more information. The current design
// of AccessController doesn't allow us to batch events and check a subset of
// connections, so we'd have to do this.
func pollAndUpdateChan(
ctx context.Context,
timeSource timeutil.TimeSource,
pollingInterval time.Duration,
accessController AccessController,
) chan AccessController {
result := make(chan AccessController)
go func() {
t := timeSource.NewTimer()
defer t.Stop()
for {
t.Reset(pollingInterval)
select {
case <-ctx.Done():
close(result)
log.Errorf(ctx, "WatchList daemon stopped: %v", ctx.Err())
return
case <-t.Ch():
t.MarkRead()
result <- accessController
}
}
}()
return result
}
40 changes: 40 additions & 0 deletions pkg/ccl/sqlproxyccl/proxy_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -235,18 +235,21 @@ func TestPrivateEndpointsACL(t *testing.T) {
tenant20 := roachpb.MustMakeTenantID(20)
tenant30 := roachpb.MustMakeTenantID(30)
tds.CreateTenant(tenant10, &tenant.Tenant{
Version: "001",
TenantID: tenant10.ToUint64(),
ClusterName: "my-tenant",
ConnectivityType: tenant.ALLOW_PUBLIC | tenant.ALLOW_PRIVATE,
PrivateEndpoints: []string{"vpce-abc123"},
})
tds.CreateTenant(tenant20, &tenant.Tenant{
Version: "002",
TenantID: tenant20.ToUint64(),
ClusterName: "other-tenant",
ConnectivityType: tenant.ALLOW_PUBLIC | tenant.ALLOW_PRIVATE,
PrivateEndpoints: []string{"vpce-some-other-vpc"},
})
tds.CreateTenant(tenant30, &tenant.Tenant{
Version: "003",
TenantID: tenant30.ToUint64(),
ClusterName: "public-tenant",
ConnectivityType: tenant.ALLOW_PUBLIC,
Expand All @@ -266,6 +269,7 @@ func TestPrivateEndpointsACL(t *testing.T) {
options := &ProxyOptions{
SkipVerify: true,
RequireProxyProtocol: true,
PollConfigInterval: 10 * time.Millisecond,
}
options.testingKnobs.directoryServer = tds
s, sqlAddr, _ := newSecureProxyServer(ctx, t, sql.Stopper(), options)
Expand Down Expand Up @@ -312,8 +316,44 @@ func TestPrivateEndpointsACL(t *testing.T) {
c.DialFunc = proxyDialer
},
func(conn *pgx.Conn) {
// Initial connection.
require.Equal(t, int64(1), s.metrics.CurConnCount.Value())
require.NoError(t, runTestQuery(ctx, conn))

// Remove endpoints and connection should be disconnected.
tds.UpdateTenant(tenant10, &tenant.Tenant{
Version: "010",
TenantID: tenant10.ToUint64(),
ClusterName: "my-tenant",
ConnectivityType: tenant.ALLOW_PUBLIC | tenant.ALLOW_PRIVATE,
PrivateEndpoints: []string{},
})

// Wait until watcher has received the updated event.
testutils.SucceedsSoon(t, func() error {
ten, err := s.handler.directoryCache.LookupTenant(ctx, tenant10)
if err != nil {
return err
}
if ten.Version != "010" {
return errors.New("tenant is not up-to-date")
}
return nil
})

// Subsequent Exec calls will eventually fail.
var err error
require.Eventually(
t,
func() bool {
_, err = conn.Exec(ctx, "SELECT 1")
return err != nil
},
time.Second, 5*time.Millisecond,
"Expected the connection to eventually fail",
)
require.Regexp(t, "unexpected EOF", err.Error())
require.Equal(t, int64(1), s.metrics.ExpiredClientConnCount.Count())
},
)
})
Expand Down

0 comments on commit 5913539

Please sign in to comment.