From fc4a9930564a0a6af57c1c51a3b383cfe68a65cc Mon Sep 17 00:00:00 2001 From: Brett Logan Date: Sun, 17 Feb 2019 13:57:52 -0500 Subject: [PATCH] [FABC-816] Add connection string to db runners Added a function to the MySQL and Postgres db runners to return the connection string specific to the implementation. Removed address parameter of the readycheck function as the readiness check now depends on a direct connection to the db, rather than dialing the endpoing Change-Id: Ia7835c1b43953068c1397e218abc61f515d7b3b3 Signed-off-by: Brett Logan --- integration/runner/mysql.go | 26 ++++++++++----------- integration/runner/postgres.go | 42 ++++++++++++++++++---------------- 2 files changed, 35 insertions(+), 33 deletions(-) diff --git a/integration/runner/mysql.go b/integration/runner/mysql.go index adae3d73a..c1a20a5f4 100644 --- a/integration/runner/mysql.go +++ b/integration/runner/mysql.go @@ -47,7 +47,6 @@ type MySQL struct { containerID string hostAddress string containerAddress string - address string mutex sync.Mutex stopped bool @@ -158,10 +157,8 @@ func (c *MySQL) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error { return errors.Wrapf(ctx.Err(), "database in container %s did not start", c.containerID) case <-containerExit: return errors.New("container exited before ready") - case <-c.ready(ctx, c.hostAddress): - c.address = c.hostAddress - case <-c.ready(ctx, c.containerAddress): - c.address = c.containerAddress + case <-c.ready(ctx): + break } cancel() @@ -191,11 +188,11 @@ func (c *MySQL) endpointReady(ctx context.Context, db *sqlx.DB) bool { return true } -func (c *MySQL) ready(ctx context.Context, addr string) <-chan struct{} { +func (c *MySQL) ready(ctx context.Context) <-chan struct{} { readyCh := make(chan struct{}) - str := fmt.Sprintf("root:@(%s:%d)/mysql", c.HostIP, c.HostPort) - db, err := sqlx.Open("mysql", str) + connStr, _ := c.GetConnectionString() + db, err := sqlx.Open("mysql", connStr) db.SetConnMaxLifetime(time.Second * 5) if err != nil { ctx.Done() @@ -258,11 +255,6 @@ func (c *MySQL) streamLogs(ctx context.Context) { stdcopy.StdCopy(c.OutputStream, c.ErrorStream, out) } -// Address returns the address successfully used by the readiness check. -func (c *MySQL) Address() string { - return c.address -} - // HostAddress returns the host address where this MySQL instance is available. func (c *MySQL) HostAddress() string { return c.hostAddress @@ -308,3 +300,11 @@ func (c *MySQL) Stop() error { return nil } + +// GetConnectionString returns the sql connection string for connecting to the DB +func (c *MySQL) GetConnectionString() (string, error) { + if c.HostIP != "" && c.HostPort != 0 { + return fmt.Sprintf("root:@(%s:%d)/mysql", c.HostIP, c.HostPort), nil + } + return "", fmt.Errorf("mysql db not initialized") +} diff --git a/integration/runner/postgres.go b/integration/runner/postgres.go index 13b2aa181..902934a4d 100644 --- a/integration/runner/postgres.go +++ b/integration/runner/postgres.go @@ -47,7 +47,6 @@ type PostgresDB struct { containerID string hostAddress string containerAddress string - address string mutex sync.Mutex stopped bool @@ -155,10 +154,8 @@ func (c *PostgresDB) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error { return errors.Wrapf(ctx.Err(), "database in container %s did not start", c.containerID) case <-containerExit: return errors.New("container exited before ready") - case <-c.ready(ctx, c.hostAddress): - c.address = c.hostAddress - case <-c.ready(ctx, c.containerAddress): - c.address = c.containerAddress + case <-c.ready(ctx): + break } cancel() @@ -178,14 +175,8 @@ func (c *PostgresDB) Run(sigCh <-chan os.Signal, ready chan<- struct{}) error { } } -func (c *PostgresDB) endpointReady(ctx context.Context, addr string) bool { - dataSource := fmt.Sprintf("host=%s port=%d user=postgres dbname=postgres sslmode=disable", c.HostIP, c.HostPort) - db, err := sqlx.Open("postgres", dataSource) - if err != nil { - return false - } - - _, err = db.Conn(ctx) +func (c *PostgresDB) endpointReady(ctx context.Context, db *sqlx.DB) bool { + _, err := db.Conn(ctx) if err != nil { return false } @@ -194,13 +185,20 @@ func (c *PostgresDB) endpointReady(ctx context.Context, addr string) bool { return true } -func (c *PostgresDB) ready(ctx context.Context, addr string) <-chan struct{} { +func (c *PostgresDB) ready(ctx context.Context) <-chan struct{} { readyCh := make(chan struct{}) + + connStr, _ := c.GetConnectionString() + db, err := sqlx.Open("postgres", connStr) + if err != nil { + ctx.Done() + } + go func() { ticker := time.NewTicker(100 * time.Millisecond) defer ticker.Stop() for { - if c.endpointReady(ctx, addr) { + if c.endpointReady(ctx, db) { close(readyCh) return } @@ -253,11 +251,6 @@ func (c *PostgresDB) streamLogs(ctx context.Context) { stdcopy.StdCopy(c.OutputStream, c.ErrorStream, out) } -// Address returns the address successfully used by the readiness check. -func (c *PostgresDB) Address() string { - return c.address -} - // HostAddress returns the host address where this PostgresDB instance is available. func (c *PostgresDB) HostAddress() string { return c.hostAddress @@ -303,3 +296,12 @@ func (c *PostgresDB) Stop() error { return nil } + +// GetConnectionString returns the sql connection string for connecting to the DB +func (c *PostgresDB) GetConnectionString() (string, error) { + if c.HostIP != "" && c.HostPort != 0 { + return fmt.Sprintf("host=%s port=%d user=postgres dbname=postgres sslmode=disable", + c.HostIP, c.HostPort), nil + } + return "", fmt.Errorf("DB not initialized") +}