Skip to content

Commit

Permalink
fix(backend): Fix performance issue within a mysql request (#9680)
Browse files Browse the repository at this point in the history
* fix(backend): Fix performance issue within a mysql request

Reprace the existing mysql request that use nested select, with inner join for better performance.
The fix levarage 'SQLDialect' interface, because the new request is not supported by sqllite (used for testing)
This interface bridges the difference between mysql (production) and sqlite
// (test)
Issue: #6845

Signed-off-by: diana <difince@gmail.com>

* For sqlite use UPDATE FROM to join the target table against another table in the database in order to help compute

Try to generalize the method in SQLDialect interface

Signed-off-by: diana <difince@gmail.com>

* Add unit tests

Signed-off-by: diana <difince@gmail.com>

* Replace nested query for Jobs and start using pre-comit

Signed-off-by: diana <difince@gmail.com>

* Fix: Use LEFT JOIN instead of INNER JOIN

Signed-off-by: diana <difince@gmail.com>

---------

Signed-off-by: diana <difince@gmail.com>
  • Loading branch information
difince authored and chensun committed Aug 17, 2023
1 parent e03e312 commit 81618d0
Show file tree
Hide file tree
Showing 3 changed files with 58 additions and 55 deletions.
13 changes: 13 additions & 0 deletions backend/src/apiserver/storage/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type SQLDialect interface {

// Inserts new rows and updates duplicates based on the key column.
Upsert(query string, key string, overwrite bool, columns ...string) string

// Updates a table using UPDATE with JOIN (mysql/production) or UPDATE FROM (sqlite/test).
UpdateWithJointOrFrom(targetTable, joinTable, setClause, joinClause, whereClause string) string
}

// MySQLDialect implements SQLDialect with mysql dialect implementation.
Expand Down Expand Up @@ -88,6 +91,11 @@ func (d MySQLDialect) IsDuplicateError(err error) bool {
return ok && sqlError.Number == mysqlerr.ER_DUP_ENTRY
}

// UpdateFromOrJoin TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available.
func (d MySQLDialect) UpdateWithJointOrFrom(targetTable, joinTable, setClause, joinClause, whereClause string) string {
return fmt.Sprintf("UPDATE %s LEFT JOIN %s ON %s SET %s WHERE %s", targetTable, joinTable, joinClause, setClause, whereClause)
}

// SQLiteDialect implements SQLDialect with sqlite dialect implementation.
type SQLiteDialect struct{}

Expand Down Expand Up @@ -131,6 +139,11 @@ func (d SQLiteDialect) IsDuplicateError(err error) bool {
return ok && sqlError.Code == sqlite3.ErrConstraint
}

// UpdateFromOrJoin TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available.
func (d SQLiteDialect) UpdateWithJointOrFrom(targetTable, joinTable, setClause, joinClause, whereClause string) string {
return fmt.Sprintf("UPDATE %s SET %s FROM %s WHERE %s AND %s", targetTable, setClause, joinTable, joinClause, whereClause)
}

func NewMySQLDialect() MySQLDialect {
return MySQLDialect{}
}
Expand Down
30 changes: 27 additions & 3 deletions backend/src/apiserver/storage/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,35 @@ func TestSQLiteDialect_Upsert(t *testing.T) {
}

func TestMySQLDialect_Upsert(t *testing.T) {
sqliteDialect := NewMySQLDialect()
actualQuery := sqliteDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", true, []string{"uuid", "name"}...)
mysqlDialect := NewMySQLDialect()
actualQuery := mysqlDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", true, []string{"uuid", "name"}...)
expectedQuery := `insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow") ON DUPLICATE KEY UPDATE uuid=VALUES(uuid),name=VALUES(name)`
assert.Equal(t, expectedQuery, actualQuery)
actualQuery2 := sqliteDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", false, []string{"uuid", "name"}...)
actualQuery2 := mysqlDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", false, []string{"uuid", "name"}...)
expectedQuery2 := `insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow") ON DUPLICATE KEY UPDATE uuid=uuid,name=name`
assert.Equal(t, expectedQuery2, actualQuery2)
}

func TestMySQLDialect_UpdateWithJointOrFrom(t *testing.T) {
mysqlDialect := NewMySQLDialect()
actualQuery := mysqlDialect.UpdateWithJointOrFrom(
"target_table",
"other_table",
"State = ?",
"target_table.Name = other_table.Name",
"target_table.status = ?")
expectedQuery := `UPDATE target_table LEFT JOIN other_table ON target_table.Name = other_table.Name SET State = ? WHERE target_table.status = ?`
assert.Equal(t, expectedQuery, actualQuery)
}

func TestSQLiteDialect_UpdateWithJointOrFrom(t *testing.T) {
sqliteDialect := NewSQLiteDialect()
actualQuery := sqliteDialect.UpdateWithJointOrFrom(
"target_table",
"other_table",
"State = ?",
"target_table.Name = other_table.Name",
"target_table.status = ?")
expectedQuery := `UPDATE target_table SET State = ? FROM other_table WHERE target_table.Name = other_table.Name AND target_table.status = ?`
assert.Equal(t, expectedQuery, actualQuery)
}
70 changes: 18 additions & 52 deletions backend/src/apiserver/storage/experiment_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,31 +309,14 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error {
"Failed to create query to archive experiment %s. error: '%v'", expId, err.Error())
}

// TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available.
// TODO(jingzhang36): use inner join to replace nested query for better performance.
filteredRunsSql, filteredRunsArgs, err := sq.Select("ResourceUUID").
From("resource_references as rf").
Where(sq.And{
sq.Eq{"rf.ResourceType": model.RunResourceType},
sq.Eq{"rf.ReferenceUUID": expId},
sq.Eq{"rf.ReferenceType": model.ExperimentResourceType},
}).ToSql()
if err != nil {
return util.NewInternalServerError(err,
"Failed to create query to filter the runs in an experiment %s. error: '%v'", expId, err.Error())
}
updateRunsSql, updateRunsArgs, err := sq.
Update("run_details").
SetMap(sq.Eq{
"StorageState": model.StorageStateArchived.ToString(),
}).
Where(sq.NotEq{"StorageState": model.StorageStateArchived.ToString()}).
Where(fmt.Sprintf("UUID in (%s) OR ExperimentUUID = '%s'", filteredRunsSql, expId), filteredRunsArgs...).
ToSql()
if err != nil {
return util.NewInternalServerError(err,
"Failed to create query to archive the runs in an experiment %s. error: '%v'", expId, err.Error())
}
var updateRunsArgs []interface{}
updateRunsArgs = append(updateRunsArgs, model.StorageStateArchived.ToString(), model.RunResourceType, expId, model.ExperimentResourceType)
updateRunsSQL := s.db.UpdateWithJointOrFrom(
"run_details",
"resource_references",
"StorageState = ?",
"run_details.UUID = resource_references.ResourceUUID",
"resource_references.ResourceType = ? AND resource_references.ReferenceUUID = ? AND resource_references.ReferenceType = ?")

updateRunsWithExperimentUUIDSql, updateRunsWithExperimentUUIDArgs, err := sq.
Update("run_details").
Expand All @@ -348,32 +331,15 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error {
"Failed to create query to archive the runs in an experiment %s. error: '%v'", expId, err.Error())
}

// TODO(jingzhang36): use inner join to replace nested query for better performance.
filteredJobsSql, filteredJobsArgs, err := sq.Select("ResourceUUID").
From("resource_references as rf").
Where(sq.And{
sq.Eq{"rf.ResourceType": model.JobResourceType},
sq.Eq{"rf.ReferenceUUID": expId},
sq.Eq{"rf.ReferenceType": model.ExperimentResourceType},
}).ToSql()
if err != nil {
return util.NewInternalServerError(err,
"Failed to create query to filter the jobs in an experiment %s. error: '%v'", expId, err.Error())
}
var updateJobsArgs []interface{}
now := s.time.Now().Unix()
updateJobsSql, updateJobsArgs, err := sq.
Update("jobs").
SetMap(sq.Eq{
"Enabled": false,
"UpdatedAtInSec": now,
}).
Where(sq.Eq{"Enabled": true}).
Where(fmt.Sprintf("UUID in (%s) OR ExperimentUUID = '%s'", filteredJobsSql, expId), filteredJobsArgs...).
ToSql()
if err != nil {
return util.NewInternalServerError(err,
"Failed to create query to archive the jobs in an experiment %s. error: '%v'", expId, err.Error())
}
updateJobsArgs = append(updateJobsArgs, false, now, model.JobResourceType, expId, model.ExperimentResourceType)
updateJobsSQL := s.db.UpdateWithJointOrFrom(
"jobs",
"resource_references",
"Enabled = ?, UpdatedAtInSec = ?",
"jobs.UUID = resource_references.ResourceUUID",
"resource_references.ResourceType = ? AND resource_references.ReferenceUUID = ? AND resource_references.ReferenceType = ?")

// In a single transaction, we update experiments, run_details and jobs tables.
tx, err := s.db.Begin()
Expand All @@ -388,7 +354,7 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error {
"Failed to archive experiment %s. error: '%v'", expId, err.Error())
}

_, err = tx.Exec(updateRunsSql, updateRunsArgs...)
_, err = tx.Exec(updateRunsSQL, updateRunsArgs...)
if err != nil {
tx.Rollback()
return util.NewInternalServerError(err,
Expand All @@ -402,7 +368,7 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error {
"Failed to archive runs with ExperimentUUID being %s. error: '%v'", expId, err.Error())
}

_, err = tx.Exec(updateJobsSql, updateJobsArgs...)
_, err = tx.Exec(updateJobsSQL, updateJobsArgs...)
if err != nil {
tx.Rollback()
return util.NewInternalServerError(err,
Expand Down

0 comments on commit 81618d0

Please sign in to comment.