Skip to content

Commit

Permalink
New db log schema
Browse files Browse the repository at this point in the history
Signed-off-by: YujiOshima <yuji.oshima0x3fd@gmail.com>
  • Loading branch information
YujiOshima committed Apr 11, 2018
1 parent a1f30d4 commit 841f66a
Show file tree
Hide file tree
Showing 10 changed files with 319 additions and 174 deletions.
17 changes: 13 additions & 4 deletions db/db_init.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,14 +50,23 @@ func (d *db_conn) DB_Init() {
log.Fatalf("Error creating trials table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS trial_logs" +
_, err = db.Exec("CREATE TABLE IF NOT EXISTS trial_metrics" +
"(trial_id CHAR(16) NOT NULL, " +
"id INT AUTO_INCREMENT PRIMARY KEY, " +
"time DATETIME(6), " +
"name VARCHAR(255), " +
"value TEXT, " +
"PRIMARY KEY (trial_id, time))")
// We can have "id INT AUTO_INCREMENT PRIMARY KEY" instead.
"is_objective TINYINT)")
if err != nil {
log.Fatalf("Error creating trial_metrics table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS trial_lastlogs" +
"(trial_id CHAR(16) PRIMARY KEY, " +
"time DATETIME(6), " +
"value TEXT)")
if err != nil {
log.Fatalf("Error creating trial_logs table: %v", err)
log.Fatalf("Error creating trial_lastlogs table: %v", err)
}

_, err = db.Exec("CREATE TABLE IF NOT EXISTS workers" +
Expand Down
187 changes: 176 additions & 11 deletions db/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,16 @@ const (
)

type GetTrialLogOpts struct {
Name string
SinceTime *time.Time
Descending bool
Limit int32
Objective bool
}

type TrialLog struct {
Time string
Time time.Time
Name string
Value string
}

Expand Down Expand Up @@ -215,6 +221,16 @@ func (d *db_conn) CreateStudy(in *api.StudyConfig) (string, error) {
}
}

var isin bool = false
for _, m := range in.Metrics {
if m == in.ObjectiveValueName {
isin = true
}
}
if !isin {
in.Metrics = append(in.Metrics, in.ObjectiveValueName)
}

var study_id string
i := 3
for true {
Expand Down Expand Up @@ -397,31 +413,67 @@ func (d *db_conn) UpdateTrial(id string, newstatus api.TrialState) error {
}

func (d *db_conn) GetTrialLogs(id string, opts *GetTrialLogOpts) ([]*TrialLog, error) {
// TODO: opts not implemented
rows, err := d.db.Query("SELECT (time, value) FROM trial_logs WHERE trial_id = ? ORDER BY time", id)
qstr := ""
qfield := []interface{}{id}
order := ""
if opts != nil {
if opts.SinceTime != nil {
qstr += " AND time >= ?"
qfield = append(qfield, opts.SinceTime)
}
if opts.Name != "" {
qstr += " AND name = ?"
qfield = append(qfield, opts.Name)
}
if opts.Objective {
qstr += " AND is_objective = 1"
}
if opts.Descending {
order = " DESC"
}
if opts.Limit > 0 {
order += fmt.Sprintf(" LIMIT %d", opts.Limit)
}
}

rows, err := d.db.Query("SELECT time, name, value FROM trial_metrics WHERE trial_id = ?"+
qstr+" ORDER BY time"+order, qfield...)
if err != nil {
return nil, err
}

var result []*TrialLog
for rows.Next() {
log1 := new(TrialLog)
var time_str string

err := rows.Scan(&((*log1).Time), &((*log1).Value))
err := rows.Scan(&time_str, &((*log1).Name), &((*log1).Value))
if err != nil {
log.Printf("Error scanning log: %v", err)
continue
}
log1.Time, err = time.Parse(mysql_time_fmt, time_str)
if err != nil {
log.Printf("Error parsing time %s: %v", time_str, err)
continue
}
result = append(result, log1)
}
return result, nil
}

func (d *db_conn) GetTrialTimestamp(id string) (*time.Time, error) {
func (d *db_conn) getTrialLastlog(id string, value *string) (*time.Time, error) {
var last_timestamp string
var err error

if value != nil {
row := d.db.QueryRow("SELECT time, value FROM trial_lastlogs WHERE trial_id = ?", id)
err = row.Scan(&last_timestamp, value)
} else {
row := d.db.QueryRow("SELECT time FROM trial_lastlogs WHERE trial_id = ?", id)
err = row.Scan(&last_timestamp)
}

row := d.db.QueryRow("SELECT time FROM trial_logs WHERE trial_id = ? ORDER BY time DESC LIMIT 1", id)
err := row.Scan(&last_timestamp)
switch {
case err == sql.ErrNoRows:
return nil, nil
Expand All @@ -438,14 +490,71 @@ func (d *db_conn) GetTrialTimestamp(id string) (*time.Time, error) {
}
}

func (d *db_conn) GetTrialTimestamp(id string) (*time.Time, error) {
return d.getTrialLastlog(id, nil)
}

func (d *db_conn) storeTrialLog(trial_id string, time string, line string,
objective_value_name string, metrics []string) error {
kvpairs := strings.Fields(line)
for _, kv := range kvpairs {
v := strings.Split(kv, "=")
if len(v) > 2 {
log.Printf("Ignoring trailing garbage: %s", kv)
}
if len(v) == 1 {
continue
}
is_objective := 0
search_keyword:
switch {
case v[0] == objective_value_name:
is_objective = 1
default:
for _, m := range metrics {
if v[0] == m {
break search_keyword
}
}
continue
}
_, err := d.db.Exec("INSERT INTO trial_metrics (trial_id, time, name, value, is_objective) VALUES (?, ?, ?, ?, ?)",
trial_id, time, v[0], v[1], is_objective)
if err != nil {
return err
}
}
return nil
}

func (d *db_conn) StoreTrialLogs(trial_id string, logs []string) error {
var lasterr error
var last_value string
var stored_logs []*string

db_t, err := d.getTrialLastlog(trial_id, &last_value)
if err != nil {
log.Printf("Error getting last log timestamp: %v", err)
}

row := d.db.QueryRow("SELECT objective_value_name, metrics FROM trials "+
"JOIN (studies) ON (trials.study_id = studies.id) WHERE "+
"trials.id = ?", trial_id)
var objective_value_name, metrics_str string
err = row.Scan(&objective_value_name, &metrics_str)
if err != nil {
log.Printf("Cannot get objective_value_name or metrics: %v", err)
return err
}
metrics := strings.Split(metrics_str, ",\n")

var formatted_time string
var ls []string
for _, logline := range logs {
if logline == "" {
continue
}
ls := strings.SplitN(logline, " ", 2)
ls = strings.SplitN(logline, " ", 2)
if len(ls) != 2 {
log.Printf("Error parsing log: %s", logline)
lasterr = errors.New("Error parsing log")
Expand All @@ -457,15 +566,71 @@ func (d *db_conn) StoreTrialLogs(trial_id string, logs []string) error {
lasterr = err
continue
}
if db_t != nil && t.Before(*db_t) {
// db_t is from mysql and has microsec precision.
// This code assumes nanosec fractions are rounded down.
continue
}
// use UTC as mysql DATETIME lacks timezone
_, err = d.db.Exec("INSERT INTO trial_logs VALUES (?, ?, ?)",
trial_id, t.UTC().Format("2006-01-02 15:04:05.999999"), ls[1])
formatted_time = t.UTC().Format(mysql_time_fmt)
if db_t != nil {
// Parse again to get rounding effect
reparsed_time, err := time.Parse(mysql_time_fmt, formatted_time)
if reparsed_time == *db_t {
if ls[1] == last_value {
// stored_logs are already in DB
// This assignment ensures the remaining
// logs will be stored in DB.
db_t = nil
continue
}
// We don't know this is necessary or not yet.
stored_logs = append(stored_logs, &ls[1])
continue
}
// (reparsed_time > *db_t) can be assumed
for _, value := range stored_logs {
err = d.storeTrialLog(trial_id,
db_t.UTC().Format(mysql_time_fmt), *value,
objective_value_name, metrics)
if err != nil {
log.Printf("Error storing log %s: %v", *value, err)
lasterr = err
}
}
db_t = nil
}

err = d.storeTrialLog(trial_id,
formatted_time, ls[1],
objective_value_name, metrics)
if err != nil {
log.Printf("Error storing log %s: %v", logline, err)
lasterr = err
}
}
return lasterr
if db_t != nil && len(stored_logs) > 0 {
// No duplicate log found. So they are valid.
for _, value := range stored_logs {
err = d.storeTrialLog(trial_id,
db_t.UTC().Format(mysql_time_fmt), *value,
objective_value_name, metrics)
if err != nil {
log.Printf("Error storing log %s: %v", *value, err)
lasterr = err
}
}
}
if lasterr != nil {
// If lastlog were updated, logs that couldn't be saved
// would be lost.
return lasterr
}
if len(ls) == 2 {
_, err = d.db.Exec("REPLACE INTO trial_lastlogs VALUES (?, ?, ?)",
trial_id, formatted_time, ls[1])
}
return err
}

func (d *db_conn) DeleteTrial(id string) error {
Expand Down
1 change: 1 addition & 0 deletions manager/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ RUN go build -o vizier-manager
FROM alpine:3.7
WORKDIR /app
COPY --from=build-env /go/src/github.com/kubeflow/hp-tuning/manager/vizier-manager /app/
COPY --from=build-env /go/src/github.com/kubeflow/hp-tuning/manager/visualise /
ENTRYPOINT ["./vizier-manager"]
CMD ["-w", "dlk"]
2 changes: 1 addition & 1 deletion manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func (s *server) trialIteration(conf *pb.StudyConfig, study_id string, sCh study
select {
case <-tm.C:
if conf.SuggestAlgorithm != "" {
err := s.wIF.CheckRunningTrials(study_id, conf.ObjectiveValueName, conf.Metrics)
err := s.wIF.CheckRunningTrials(study_id, conf.ObjectiveValueName)
if err != nil {
return err
}
Expand Down
15 changes: 3 additions & 12 deletions manager/visualise/tensorboard/tensorboardIF.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@ package tensorboard

import (
"bytes"
"fmt"
"github.com/kubeflow/hp-tuning/api"
vdb "github.com/kubeflow/hp-tuning/db"
"io/ioutil"
apiv1 "k8s.io/api/core/v1"
exbeatav1 "k8s.io/api/extensions/v1beta1"
Expand All @@ -31,34 +29,27 @@ func SpawnTensorBoard(sid string, tid string, namespace string, mount *api.Mount
BUFSIZE := 1024
var tFile []byte
var err error
var dbIf vdb.VizierDBInterface
dbIf = vdb.New()
sc, err := dbIf.GetStudyConfig(sid)
if err != nil {
return err
}

dep := exbeatav1.Deployment{}
tFile, err = ioutil.ReadFile("visualise/tensorboard/manifest_template/deployment.yaml")
tFile, err = ioutil.ReadFile("/tensorboard/manifest_template/deployment.yaml")
if err != nil {
return err
}
k8syaml.NewYAMLOrJSONDecoder(bytes.NewReader(tFile), BUFSIZE).Decode(&dep)

ing := exbeatav1.Ingress{}
tFile, err = ioutil.ReadFile("visualise/tensorboard/manifest_template/ingress.yaml")
tFile, err = ioutil.ReadFile("/tensorboard/manifest_template/ingress.yaml")
if err != nil {
return err
}
k8syaml.NewYAMLOrJSONDecoder(bytes.NewReader(tFile), BUFSIZE).Decode(&ing)

svc := apiv1.Service{}
tFile, err = ioutil.ReadFile("visualise/tensorboard/manifest_template/service.yaml")
tFile, err = ioutil.ReadFile("/tensorboard/manifest_template/service.yaml")
if err != nil {
return err
}
k8syaml.NewYAMLOrJSONDecoder(bytes.NewReader(tFile), BUFSIZE).Decode(&svc)
fmt.Printf("sc conf in TB %v\n", sc)

tname := "tensorboard-" + sid + "-" + tid

Expand Down
Loading

0 comments on commit 841f66a

Please sign in to comment.