Skip to content

Commit

Permalink
feat: Automatially check for DNS changes periodically. On change, clo…
Browse files Browse the repository at this point in the history
…se all connections and create a new dialer.

chore: Expose the refresh strategy UseIAMAuthN() value to the dialer. 

Part of #842

chore: Add domain name to the cloudsql.ConnName struct

Feat: Check for DNS changes on connect. On change, close all connections and create a new dialer.

feat: Automatially check for DNS changes periodically. On change, close all connections and create a new dialer.

wip: eno changes

wip: eno interface cleanup

wip: convert monitoredInstance to *monitoredInstance
  • Loading branch information
hessjcg committed Sep 18, 2024
1 parent d380389 commit 040351e
Show file tree
Hide file tree
Showing 8 changed files with 408 additions and 66 deletions.
38 changes: 37 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,8 @@ func connect() {
// ... etc
}
```
### Using DNS to identify an instance

### Using DNS domain names to identify instances

The connector can be configured to use DNS to look up an instance. This would
allow you to configure your application to connect to a database instance, and
Expand Down Expand Up @@ -292,6 +293,41 @@ func connect() {
}
```

### Automatic fail-over using DNS domain names

When the connector is configured using a domain name, the connector will
periodically check if the DNS record for an instance changes. When the connector
detects that the domain name refers to a different instance, the connector will
close all open connections to the old instance. Subsequent connection attempts
will be directed to the new instance.

For example: suppose application is configured to connect using the
domain name `prod-db.mycompany.example.com`. Initially the corporate DNS
zone has a TXT record with the value `my-project:region:my-instance`. The
application establishes connections to the `my-project:region:my-instance`
Cloud SQL instance.

Then, to reconfigure the application using a different database
instance: `my-project:other-region:my-instance-2`. You update the DNS record
for `prod-db.mycompany.example.com` with the target
`my-project:other-region:my-instance-2`

The connector inside the application detects the change to this
DNS entry. Now, when the application connects to its database using the
domain name `prod-db.mycompany.example.com`, it will connect to the
`my-project:other-region:my-instance-2` Cloud SQL instance.

The connector will automatically close all existing connections to
`my-project:region:my-instance`. This will force the connection pools to
establish new connections. Also, it may cause database queries in progress
to fail.

The connector will poll for changes to the DNS name every 30 seconds by default.
You may configure the frequency of the connections using the option
`WithFailoverPeriod(d time.Duration)`. When this is set to 0, the connector will
disable polling and only check if the DNS record changed when it is
creating a new connection.


### Using Options

Expand Down
64 changes: 27 additions & 37 deletions dialer.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,41 +107,10 @@ type connectionInfoCache interface {
ConnectionInfo(context.Context) (cloudsql.ConnectionInfo, error)
UpdateRefresh(*bool)
ForceRefresh()
UseIAMAuthN() bool
io.Closer
}

// monitoredCache is a wrapper around a connectionInfoCache that tracks the
// number of connections to the associated instance.
type monitoredCache struct {
openConnsCount *uint64

connectionInfoCache
}

func (c *monitoredCache) Close() error {
return c.connectionInfoCache.Close()
}

func (c *monitoredCache) ForceRefresh() {
if c == nil || c.connectionInfoCache == nil {
return
}
c.connectionInfoCache.ForceRefresh()
}

func (c *monitoredCache) UpdateRefresh(b *bool) {
if c == nil || c.connectionInfoCache == nil {
return
}
c.connectionInfoCache.UpdateRefresh(b)
}
func (c *monitoredCache) ConnectionInfo(ctx context.Context) (cloudsql.ConnectionInfo, error) {
if c == nil || c.connectionInfoCache == nil {
return cloudsql.ConnectionInfo{}, nil
}
return c.connectionInfoCache.ConnectionInfo(ctx)
}

// A Dialer is used to create connections to Cloud SQL instances.
//
// Use NewDialer to initialize a Dialer.
Expand Down Expand Up @@ -178,7 +147,8 @@ type Dialer struct {
iamTokenSource oauth2.TokenSource

// resolver converts instance names into DNS names.
resolver instance.ConnectionNameResolver
resolver instance.ConnectionNameResolver
failoverPeriod time.Duration
}

var (
Expand All @@ -202,6 +172,7 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
logger: nullLogger{},
useragents: []string{userAgent},
serviceUniverse: "googleapis.com",
failoverPeriod: cloudsql.FailoverPeriod,
}
for _, opt := range opts {
opt(cfg)
Expand All @@ -215,6 +186,7 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
if cfg.setIAMAuthNTokenSource && !cfg.useIAMAuthN {
return nil, errUseTokenSource
}

// Add this to the end to make sure it's not overridden
cfg.sqladminOpts = append(cfg.sqladminOpts, option.WithUserAgent(strings.Join(cfg.useragents, " ")))

Expand Down Expand Up @@ -297,7 +269,9 @@ func NewDialer(ctx context.Context, opts ...Option) (*Dialer, error) {
iamTokenSource: cfg.iamLoginTokenSource,
dialFunc: cfg.dialFunc,
resolver: r,
failoverPeriod: cfg.failoverPeriod,
}

return d, nil
}

Expand Down Expand Up @@ -413,6 +387,13 @@ func (d *Dialer) Dial(ctx context.Context, icn string, opts ...DialOption) (conn
trace.RecordOpenConnections(context.Background(), int64(n), d.dialerID, cn.String())
}, d.dialerID, cn.String())

// If this connection was opened using a Domain Name, then store it for later
// in case it needs to be forcibly closed.
if cn.DomainName() != "" {
c.mu.Lock()
c.openConns = append(c.openConns, iConn)
c.mu.Unlock()
}
return iConn, nil
}

Expand Down Expand Up @@ -514,6 +495,7 @@ func newInstrumentedConn(conn net.Conn, closeFunc func(), dialerID, connName str
type instrumentedConn struct {
net.Conn
closeFunc func()
mu sync.RWMutex
closed bool
dialerID string
connName string
Expand All @@ -539,9 +521,18 @@ func (i *instrumentedConn) Write(b []byte) (int, error) {
return bytesWritten, err
}

// isClosed returns true if this connection is closing or is already closed.
func (i *instrumentedConn) isClosed() bool {
i.mu.RLock()
defer i.mu.RUnlock()
return i.closed
}

// Close delegates to the underlying net.Conn interface and reports the close
// to the provided closeFunc only when Close returns no error.
func (i *instrumentedConn) Close() error {
i.mu.Lock()
defer i.mu.Unlock()
i.closed = true
err := i.Conn.Close()
if err != nil {
Expand Down Expand Up @@ -582,6 +573,8 @@ func (d *Dialer) connectionInfoCache(
})

if old != nil {
// ensure that the old cache entry is closed.
// monitoredCache.Close() may be called safely, even if old is closed.
old.Close()
}

Expand Down Expand Up @@ -621,10 +614,7 @@ func (d *Dialer) createConnectionInfoCache(
d.dialerID, useIAMAuthNDial,
)
}
c := &monitoredCache{
openConnsCount: new(uint64),
connectionInfoCache: cache,
}
c := newMonitoredCache(ctx, cache, cn, d.failoverPeriod, d.resolver, d.logger)

c.UpdateRefresh(useIAMAuthN)

Expand Down
9 changes: 7 additions & 2 deletions dialer_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ func (d *dialerCache) getOrAdd(cn instance.ConnName, f func() (*monitoredCache,
d.mu.RLock()
c, ok := d.cache[cn]
d.mu.RUnlock()
if ok {
if ok && !c.isClosed() {
return c, oldC, nil
}

Expand All @@ -100,7 +100,12 @@ func (d *dialerCache) getOrAdd(cn instance.ConnName, f func() (*monitoredCache,
// Look up in the map by CN again
c, ok = d.cache[cn]
if ok {
return c, nil, nil
if !c.isClosed() {
return c, nil, nil
}
// c is closed, therefore remove it from the cache.
oldC = c
delete(d.cache, cn)
}

// Try to get an instance with the same domain name but different instance
Expand Down
Loading

0 comments on commit 040351e

Please sign in to comment.