Skip to content

Commit

Permalink
[FABC-816] Add connection string to db runners
Browse files Browse the repository at this point in the history
Added a function to the MySQL and Postgres db
runners to return the connection string specific
to the implementation.

Removed address parameter of the readycheck function
as the readiness check now depends on a direct connection
to the db, rather than dialing the endpoing

Change-Id: Ia7835c1b43953068c1397e218abc61f515d7b3b3
Signed-off-by: Brett Logan <Brett.T.Logan@ibm.com>
  • Loading branch information
Brett Logan authored and Brett Logan committed Feb 17, 2019
1 parent a60d006 commit fc4a993
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 33 deletions.
26 changes: 13 additions & 13 deletions integration/runner/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type MySQL struct {
containerID string
hostAddress string
containerAddress string
address string

mutex sync.Mutex
stopped bool
Expand Down Expand Up @@ -158,10 +157,8 @@ func (c *MySQL) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
return errors.Wrapf(ctx.Err(), "database in container %s did not start", c.containerID)
case <-containerExit:
return errors.New("container exited before ready")
case <-c.ready(ctx, c.hostAddress):
c.address = c.hostAddress
case <-c.ready(ctx, c.containerAddress):
c.address = c.containerAddress
case <-c.ready(ctx):
break
}

cancel()
Expand Down Expand Up @@ -191,11 +188,11 @@ func (c *MySQL) endpointReady(ctx context.Context, db *sqlx.DB) bool {
return true
}

func (c *MySQL) ready(ctx context.Context, addr string) <-chan struct{} {
func (c *MySQL) ready(ctx context.Context) <-chan struct{} {
readyCh := make(chan struct{})

str := fmt.Sprintf("root:@(%s:%d)/mysql", c.HostIP, c.HostPort)
db, err := sqlx.Open("mysql", str)
connStr, _ := c.GetConnectionString()
db, err := sqlx.Open("mysql", connStr)
db.SetConnMaxLifetime(time.Second * 5)
if err != nil {
ctx.Done()
Expand Down Expand Up @@ -258,11 +255,6 @@ func (c *MySQL) streamLogs(ctx context.Context) {
stdcopy.StdCopy(c.OutputStream, c.ErrorStream, out)
}

// Address returns the address successfully used by the readiness check.
func (c *MySQL) Address() string {
return c.address
}

// HostAddress returns the host address where this MySQL instance is available.
func (c *MySQL) HostAddress() string {
return c.hostAddress
Expand Down Expand Up @@ -308,3 +300,11 @@ func (c *MySQL) Stop() error {

return nil
}

// GetConnectionString returns the sql connection string for connecting to the DB
func (c *MySQL) GetConnectionString() (string, error) {
if c.HostIP != "" && c.HostPort != 0 {
return fmt.Sprintf("root:@(%s:%d)/mysql", c.HostIP, c.HostPort), nil
}
return "", fmt.Errorf("mysql db not initialized")
}
42 changes: 22 additions & 20 deletions integration/runner/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ type PostgresDB struct {
containerID string
hostAddress string
containerAddress string
address string

mutex sync.Mutex
stopped bool
Expand Down Expand Up @@ -155,10 +154,8 @@ func (c *PostgresDB) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
return errors.Wrapf(ctx.Err(), "database in container %s did not start", c.containerID)
case <-containerExit:
return errors.New("container exited before ready")
case <-c.ready(ctx, c.hostAddress):
c.address = c.hostAddress
case <-c.ready(ctx, c.containerAddress):
c.address = c.containerAddress
case <-c.ready(ctx):
break
}

cancel()
Expand All @@ -178,14 +175,8 @@ func (c *PostgresDB) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error {
}
}

func (c *PostgresDB) endpointReady(ctx context.Context, addr string) bool {
dataSource := fmt.Sprintf("host=%s port=%d user=postgres dbname=postgres sslmode=disable", c.HostIP, c.HostPort)
db, err := sqlx.Open("postgres", dataSource)
if err != nil {
return false
}

_, err = db.Conn(ctx)
func (c *PostgresDB) endpointReady(ctx context.Context, db *sqlx.DB) bool {
_, err := db.Conn(ctx)
if err != nil {
return false
}
Expand All @@ -194,13 +185,20 @@ func (c *PostgresDB) endpointReady(ctx context.Context, addr string) bool {
return true
}

func (c *PostgresDB) ready(ctx context.Context, addr string) <-chan struct{} {
func (c *PostgresDB) ready(ctx context.Context) <-chan struct{} {
readyCh := make(chan struct{})

connStr, _ := c.GetConnectionString()
db, err := sqlx.Open("postgres", connStr)
if err != nil {
ctx.Done()
}

go func() {
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
for {
if c.endpointReady(ctx, addr) {
if c.endpointReady(ctx, db) {
close(readyCh)
return
}
Expand Down Expand Up @@ -253,11 +251,6 @@ func (c *PostgresDB) streamLogs(ctx context.Context) {
stdcopy.StdCopy(c.OutputStream, c.ErrorStream, out)
}

// Address returns the address successfully used by the readiness check.
func (c *PostgresDB) Address() string {
return c.address
}

// HostAddress returns the host address where this PostgresDB instance is available.
func (c *PostgresDB) HostAddress() string {
return c.hostAddress
Expand Down Expand Up @@ -303,3 +296,12 @@ func (c *PostgresDB) Stop() error {

return nil
}

// GetConnectionString returns the sql connection string for connecting to the DB
func (c *PostgresDB) GetConnectionString() (string, error) {
if c.HostIP != "" && c.HostPort != 0 {
return fmt.Sprintf("host=%s port=%d user=postgres dbname=postgres sslmode=disable",
c.HostIP, c.HostPort), nil
}
return "", fmt.Errorf("DB not initialized")
}

0 comments on commit fc4a993

Please sign in to comment.