Skip to content

Commit

Permalink
Simple db connect loop
Browse files Browse the repository at this point in the history
Signed-off-by: Koichiro Den <den@valinux.co.jp>
  • Loading branch information
Koichiro Den committed Nov 28, 2018
1 parent a0d4d06 commit dc7bb07
Showing 1 changed file with 22 additions and 50 deletions.
72 changes: 22 additions & 50 deletions pkg/db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,6 @@ import (
"time"

api "github.com/kubeflow/katib/pkg/api"
"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
restclient "k8s.io/client-go/rest"

_ "github.com/go-sql-driver/mysql"
)
Expand All @@ -27,6 +22,9 @@ const (
dbDriver = "mysql"
dbNameTmpl = "root:%s@tcp(vizier-db:3306)/vizier"
mysqlTimeFmt = "2006-01-02 15:04:05.999999"

connectInterval = 5 * time.Second
connectTimeout = 60 * time.Second
)

type GetWorkerLogOpts struct {
Expand Down Expand Up @@ -95,46 +93,6 @@ func getDbName() string {
return fmt.Sprintf(dbNameTmpl, dbPass)
}

func waitForDbReady(timeToWait time.Duration) error {
config, err := restclient.InClusterConfig()
if err != nil {
return err
}
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
return err
}

// TODO: factor out and use common definitions.
namespace := "kubeflow"
selectors := labels.Set{
"app": "vizier",
"component": "db",
}.AsSelectorPreValidated()
listOptions := metav1.ListOptions{
LabelSelector: selectors.String(),
}

watch, err := clientset.CoreV1().Pods(namespace).Watch(listOptions)
if err != nil {
return fmt.Errorf("Failed to set up watch for pod %v (error: %v)", listOptions, err)
}
events := watch.ResultChan()

startTime := time.Now()
for {
select {
case event := <-events:
pod := event.Object.(*v1.Pod)
if pod.Status.Phase == v1.PodRunning && pod.Status.ContainerStatuses[0].Ready {
return nil
}
case <-time.After(timeToWait - time.Since(startTime)):
return fmt.Errorf("Timeout waiting for DB ready")
}
}
}

func NewWithSQLConn(db *sql.DB) VizierDBInterface {
d := new(dbConn)
d.db = db
Expand All @@ -150,13 +108,27 @@ func NewWithSQLConn(db *sql.DB) VizierDBInterface {
}

func New() VizierDBInterface {
timeToWait := time.Duration(30) * time.Second
err := waitForDbReady(timeToWait)
if err != nil {
log.Fatalf("DB is not ready: %v", err)
var db *sql.DB
var err error

ticker := time.NewTicker(connectInterval)
defer ticker.Stop()
startTime := time.Now()

loop:
for {
select {
case <-ticker.C:
if db, err = sql.Open(dbDriver, getDbName()); err == nil {
if err = db.Ping(); err == nil {
break loop
}
}
case <-time.After(connectTimeout - time.Since(startTime)):
break loop
}
}

db, err := sql.Open(dbDriver, getDbName())
if err != nil {
log.Fatalf("DB open failed: %v", err)
}
Expand Down

0 comments on commit dc7bb07

Please sign in to comment.