From 383e558b6fa25141fd4b82a3ddba097c8c5ce1c5 Mon Sep 17 00:00:00 2001 From: diana Date: Fri, 23 Jun 2023 11:27:48 +0300 Subject: [PATCH 1/5] fix(backend): Fix performance issue within a mysql request Reprace the existing mysql request that use nested select, with inner join for better performance. The fix levarage 'SQLDialect' interface, because the new request is not supported by sqllite (used for testing) This interface bridges the difference between mysql (production) and sqlite // (test) Issue: https://github.com/kubeflow/pipelines/issues/6845 Signed-off-by: diana --- backend/src/apiserver/storage/db.go | 19 ++++++++++++ .../src/apiserver/storage/experiment_store.go | 29 ++----------------- 2 files changed, 21 insertions(+), 27 deletions(-) diff --git a/backend/src/apiserver/storage/db.go b/backend/src/apiserver/storage/db.go index d15d66c72dc..467ddf3171e 100644 --- a/backend/src/apiserver/storage/db.go +++ b/backend/src/apiserver/storage/db.go @@ -23,6 +23,8 @@ import ( "github.com/VividCortex/mysqlerr" "github.com/go-sql-driver/mysql" sqlite3 "github.com/mattn/go-sqlite3" + + "github.com/kubeflow/pipelines/backend/src/apiserver/model" ) // DB a struct wrapping plain sql library with SQL dialect, to solve any feature @@ -59,6 +61,9 @@ type SQLDialect interface { // Inserts new rows and updates duplicates based on the key column. Upsert(query string, key string, overwrite bool, columns ...string) string + + // Updates a run_details based on join (production) or inner select (test). + UpdateRunDetailsStorageState(expId string) (string, []interface{}) } // MySQLDialect implements SQLDialect with mysql dialect implementation. @@ -88,6 +93,13 @@ func (d MySQLDialect) IsDuplicateError(err error) bool { return ok && sqlError.Number == mysqlerr.ER_DUP_ENTRY } +// TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available. +func (d MySQLDialect) UpdateRunDetailsStorageState(expId string) (string, []interface{}) { + var args []interface{} + args = append(args, model.StorageStateArchived.ToString(), model.RunResourceType, expId, model.ExperimentResourceType) + return "UPDATE run_details as runs JOIN resource_references as rf ON runs.UUID = rf.ResourceUUID SET StorageState = ? WHERE rf.ResourceType = ? AND rf.ReferenceUUID = ? AND rf.ReferenceType = ?", args +} + // SQLiteDialect implements SQLDialect with sqlite dialect implementation. type SQLiteDialect struct{} @@ -131,6 +143,13 @@ func (d SQLiteDialect) IsDuplicateError(err error) bool { return ok && sqlError.Code == sqlite3.ErrConstraint } +// TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available. +func (d SQLiteDialect) UpdateRunDetailsStorageState(expId string) (string, []interface{}) { + var args []interface{} + args = append(args, model.StorageStateArchived.ToString(), model.StorageStateArchived.ToString(), model.RunResourceType, expId, model.ExperimentResourceType, expId) + return "UPDATE run_details SET StorageState = ? WHERE StorageState <> ? AND UUID in (SELECT ResourceUUID FROM resource_references as rf WHERE (rf.ResourceType = ? AND rf.ReferenceUUID = ? AND rf.ReferenceType = ?)) OR ExperimentUUID = ?", args +} + func NewMySQLDialect() MySQLDialect { return MySQLDialect{} } diff --git a/backend/src/apiserver/storage/experiment_store.go b/backend/src/apiserver/storage/experiment_store.go index d254537a28d..49a00b56da5 100644 --- a/backend/src/apiserver/storage/experiment_store.go +++ b/backend/src/apiserver/storage/experiment_store.go @@ -309,32 +309,6 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error { "Failed to create query to archive experiment %s. error: '%v'", expId, err.Error()) } - // TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available. - // TODO(jingzhang36): use inner join to replace nested query for better performance. - filteredRunsSql, filteredRunsArgs, err := sq.Select("ResourceUUID"). - From("resource_references as rf"). - Where(sq.And{ - sq.Eq{"rf.ResourceType": model.RunResourceType}, - sq.Eq{"rf.ReferenceUUID": expId}, - sq.Eq{"rf.ReferenceType": model.ExperimentResourceType}, - }).ToSql() - if err != nil { - return util.NewInternalServerError(err, - "Failed to create query to filter the runs in an experiment %s. error: '%v'", expId, err.Error()) - } - updateRunsSql, updateRunsArgs, err := sq. - Update("run_details"). - SetMap(sq.Eq{ - "StorageState": model.StorageStateArchived.ToString(), - }). - Where(sq.NotEq{"StorageState": model.StorageStateArchived.ToString()}). - Where(fmt.Sprintf("UUID in (%s) OR ExperimentUUID = '%s'", filteredRunsSql, expId), filteredRunsArgs...). - ToSql() - if err != nil { - return util.NewInternalServerError(err, - "Failed to create query to archive the runs in an experiment %s. error: '%v'", expId, err.Error()) - } - updateRunsWithExperimentUUIDSql, updateRunsWithExperimentUUIDArgs, err := sq. Update("run_details"). SetMap(sq.Eq{ @@ -388,7 +362,8 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error { "Failed to archive experiment %s. error: '%v'", expId, err.Error()) } - _, err = tx.Exec(updateRunsSql, updateRunsArgs...) + query, arguments := s.db.UpdateRunDetailsStorageState(expId) + _, err = tx.Exec(query, arguments...) if err != nil { tx.Rollback() return util.NewInternalServerError(err, From 620352be024ef369e3575ed71643b32577aff451 Mon Sep 17 00:00:00 2001 From: diana Date: Tue, 4 Jul 2023 15:06:56 +0300 Subject: [PATCH 2/5] For sqlite use UPDATE FROM to join the target table against another table in the database in order to help compute Try to generalize the method in SQLDialect interface Signed-off-by: diana --- backend/src/apiserver/storage/db.go | 22 +++++++------------ .../src/apiserver/storage/experiment_store.go | 9 +++++++- 2 files changed, 16 insertions(+), 15 deletions(-) diff --git a/backend/src/apiserver/storage/db.go b/backend/src/apiserver/storage/db.go index 467ddf3171e..2eea820df63 100644 --- a/backend/src/apiserver/storage/db.go +++ b/backend/src/apiserver/storage/db.go @@ -23,8 +23,6 @@ import ( "github.com/VividCortex/mysqlerr" "github.com/go-sql-driver/mysql" sqlite3 "github.com/mattn/go-sqlite3" - - "github.com/kubeflow/pipelines/backend/src/apiserver/model" ) // DB a struct wrapping plain sql library with SQL dialect, to solve any feature @@ -62,8 +60,8 @@ type SQLDialect interface { // Inserts new rows and updates duplicates based on the key column. Upsert(query string, key string, overwrite bool, columns ...string) string - // Updates a run_details based on join (production) or inner select (test). - UpdateRunDetailsStorageState(expId string) (string, []interface{}) + // Updates a table using UPDATE with JOIN (mysql/production) or UPDATE FROM (sqlite/test). + UpdateWithJointOrFrom(targetTable, joinTable, setClause, joinClause, whereClause string) string } // MySQLDialect implements SQLDialect with mysql dialect implementation. @@ -93,11 +91,9 @@ func (d MySQLDialect) IsDuplicateError(err error) bool { return ok && sqlError.Number == mysqlerr.ER_DUP_ENTRY } -// TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available. -func (d MySQLDialect) UpdateRunDetailsStorageState(expId string) (string, []interface{}) { - var args []interface{} - args = append(args, model.StorageStateArchived.ToString(), model.RunResourceType, expId, model.ExperimentResourceType) - return "UPDATE run_details as runs JOIN resource_references as rf ON runs.UUID = rf.ResourceUUID SET StorageState = ? WHERE rf.ResourceType = ? AND rf.ReferenceUUID = ? AND rf.ReferenceType = ?", args +// UpdateFromOrJoin TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available. +func (d MySQLDialect) UpdateWithJointOrFrom(targetTable, joinTable, setClause, joinClause, whereClause string) string { + return fmt.Sprintf("UPDATE %s JOIN %s ON %s SET %s WHERE %s", targetTable, joinTable, joinClause, setClause, whereClause) } // SQLiteDialect implements SQLDialect with sqlite dialect implementation. @@ -143,11 +139,9 @@ func (d SQLiteDialect) IsDuplicateError(err error) bool { return ok && sqlError.Code == sqlite3.ErrConstraint } -// TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available. -func (d SQLiteDialect) UpdateRunDetailsStorageState(expId string) (string, []interface{}) { - var args []interface{} - args = append(args, model.StorageStateArchived.ToString(), model.StorageStateArchived.ToString(), model.RunResourceType, expId, model.ExperimentResourceType, expId) - return "UPDATE run_details SET StorageState = ? WHERE StorageState <> ? AND UUID in (SELECT ResourceUUID FROM resource_references as rf WHERE (rf.ResourceType = ? AND rf.ReferenceUUID = ? AND rf.ReferenceType = ?)) OR ExperimentUUID = ?", args +// UpdateFromOrJoin TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available. +func (d SQLiteDialect) UpdateWithJointOrFrom(targetTable, joinTable, setClause, joinClause, whereClause string) string { + return fmt.Sprintf("UPDATE %s SET %s FROM %s WHERE %s AND %s", targetTable, setClause, joinTable, joinClause, whereClause) } func NewMySQLDialect() MySQLDialect { diff --git a/backend/src/apiserver/storage/experiment_store.go b/backend/src/apiserver/storage/experiment_store.go index 49a00b56da5..a9ccab8862f 100644 --- a/backend/src/apiserver/storage/experiment_store.go +++ b/backend/src/apiserver/storage/experiment_store.go @@ -362,7 +362,14 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error { "Failed to archive experiment %s. error: '%v'", expId, err.Error()) } - query, arguments := s.db.UpdateRunDetailsStorageState(expId) + var arguments []interface{} + arguments = append(arguments, model.StorageStateArchived.ToString(), model.RunResourceType, expId, model.ExperimentResourceType) + query := s.db.UpdateWithJointOrFrom( + "run_details", + "resource_references", + "StorageState = ?", + "run_details.UUID = resource_references.ResourceUUID", + "resource_references.ResourceType = ? AND resource_references.ReferenceUUID = ? AND resource_references.ReferenceType = ?") _, err = tx.Exec(query, arguments...) if err != nil { tx.Rollback() From 06024376b94dab7d9a262d8ebed9953e741048f2 Mon Sep 17 00:00:00 2001 From: diana Date: Tue, 4 Jul 2023 16:48:54 +0300 Subject: [PATCH 3/5] Add unit tests Signed-off-by: diana --- backend/src/apiserver/storage/db_test.go | 30 +++++++++++++++++++++--- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/backend/src/apiserver/storage/db_test.go b/backend/src/apiserver/storage/db_test.go index 256ac4d263b..dbd35d35859 100644 --- a/backend/src/apiserver/storage/db_test.go +++ b/backend/src/apiserver/storage/db_test.go @@ -103,11 +103,35 @@ func TestSQLiteDialect_Upsert(t *testing.T) { } func TestMySQLDialect_Upsert(t *testing.T) { - sqliteDialect := NewMySQLDialect() - actualQuery := sqliteDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", true, []string{"uuid", "name"}...) + mysqlDialect := NewMySQLDialect() + actualQuery := mysqlDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", true, []string{"uuid", "name"}...) expectedQuery := `insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow") ON DUPLICATE KEY UPDATE uuid=VALUES(uuid),name=VALUES(name)` assert.Equal(t, expectedQuery, actualQuery) - actualQuery2 := sqliteDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", false, []string{"uuid", "name"}...) + actualQuery2 := mysqlDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", false, []string{"uuid", "name"}...) expectedQuery2 := `insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow") ON DUPLICATE KEY UPDATE uuid=uuid,name=name` assert.Equal(t, expectedQuery2, actualQuery2) } + +func TestMySQLDialect_UpdateWithJointOrFrom(t *testing.T) { + mysqlDialect := NewMySQLDialect() + actualQuery := mysqlDialect.UpdateWithJointOrFrom( + "target_table", + "other_table", + "State = ?", + "target_table.Name = other_table.Name", + "target_table.status = ?") + expectedQuery := `UPDATE target_table JOIN other_table ON target_table.Name = other_table.Name SET State = ? WHERE target_table.status = ?` + assert.Equal(t, expectedQuery, actualQuery) +} + +func TestSQLiteDialect_UpdateWithJointOrFrom(t *testing.T) { + sqliteDialect := NewSQLiteDialect() + actualQuery := sqliteDialect.UpdateWithJointOrFrom( + "target_table", + "other_table", + "State = ?", + "target_table.Name = other_table.Name", + "target_table.status = ?") + expectedQuery := `UPDATE target_table SET State = ? FROM other_table WHERE target_table.Name = other_table.Name AND target_table.status = ?` + assert.Equal(t, expectedQuery, actualQuery) +} From a61bf67fd63e8c8ab4f0506b9b97b70de68ff0a9 Mon Sep 17 00:00:00 2001 From: diana Date: Fri, 7 Jul 2023 14:06:22 +0300 Subject: [PATCH 4/5] Replace nested query for Jobs and start using pre-comit Signed-off-by: diana --- .../src/apiserver/storage/experiment_store.go | 54 +++++++------------ 1 file changed, 19 insertions(+), 35 deletions(-) diff --git a/backend/src/apiserver/storage/experiment_store.go b/backend/src/apiserver/storage/experiment_store.go index a9ccab8862f..f394e2f9039 100644 --- a/backend/src/apiserver/storage/experiment_store.go +++ b/backend/src/apiserver/storage/experiment_store.go @@ -309,6 +309,15 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error { "Failed to create query to archive experiment %s. error: '%v'", expId, err.Error()) } + var updateRunsArgs []interface{} + updateRunsArgs = append(updateRunsArgs, model.StorageStateArchived.ToString(), model.RunResourceType, expId, model.ExperimentResourceType) + updateRunsSQL := s.db.UpdateWithJointOrFrom( + "run_details", + "resource_references", + "StorageState = ?", + "run_details.UUID = resource_references.ResourceUUID", + "resource_references.ResourceType = ? AND resource_references.ReferenceUUID = ? AND resource_references.ReferenceType = ?") + updateRunsWithExperimentUUIDSql, updateRunsWithExperimentUUIDArgs, err := sq. Update("run_details"). SetMap(sq.Eq{ @@ -322,32 +331,15 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error { "Failed to create query to archive the runs in an experiment %s. error: '%v'", expId, err.Error()) } - // TODO(jingzhang36): use inner join to replace nested query for better performance. - filteredJobsSql, filteredJobsArgs, err := sq.Select("ResourceUUID"). - From("resource_references as rf"). - Where(sq.And{ - sq.Eq{"rf.ResourceType": model.JobResourceType}, - sq.Eq{"rf.ReferenceUUID": expId}, - sq.Eq{"rf.ReferenceType": model.ExperimentResourceType}, - }).ToSql() - if err != nil { - return util.NewInternalServerError(err, - "Failed to create query to filter the jobs in an experiment %s. error: '%v'", expId, err.Error()) - } + var updateJobsArgs []interface{} now := s.time.Now().Unix() - updateJobsSql, updateJobsArgs, err := sq. - Update("jobs"). - SetMap(sq.Eq{ - "Enabled": false, - "UpdatedAtInSec": now, - }). - Where(sq.Eq{"Enabled": true}). - Where(fmt.Sprintf("UUID in (%s) OR ExperimentUUID = '%s'", filteredJobsSql, expId), filteredJobsArgs...). - ToSql() - if err != nil { - return util.NewInternalServerError(err, - "Failed to create query to archive the jobs in an experiment %s. error: '%v'", expId, err.Error()) - } + updateJobsArgs = append(updateJobsArgs, false, now, model.JobResourceType, expId, model.ExperimentResourceType) + updateJobsSQL := s.db.UpdateWithJointOrFrom( + "jobs", + "resource_references", + "Enabled = ?, UpdatedAtInSec = ?", + "jobs.UUID = resource_references.ResourceUUID", + "resource_references.ResourceType = ? AND resource_references.ReferenceUUID = ? AND resource_references.ReferenceType = ?") // In a single transaction, we update experiments, run_details and jobs tables. tx, err := s.db.Begin() @@ -362,15 +354,7 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error { "Failed to archive experiment %s. error: '%v'", expId, err.Error()) } - var arguments []interface{} - arguments = append(arguments, model.StorageStateArchived.ToString(), model.RunResourceType, expId, model.ExperimentResourceType) - query := s.db.UpdateWithJointOrFrom( - "run_details", - "resource_references", - "StorageState = ?", - "run_details.UUID = resource_references.ResourceUUID", - "resource_references.ResourceType = ? AND resource_references.ReferenceUUID = ? AND resource_references.ReferenceType = ?") - _, err = tx.Exec(query, arguments...) + _, err = tx.Exec(updateRunsSQL, updateRunsArgs...) if err != nil { tx.Rollback() return util.NewInternalServerError(err, @@ -384,7 +368,7 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error { "Failed to archive runs with ExperimentUUID being %s. error: '%v'", expId, err.Error()) } - _, err = tx.Exec(updateJobsSql, updateJobsArgs...) + _, err = tx.Exec(updateJobsSQL, updateJobsArgs...) if err != nil { tx.Rollback() return util.NewInternalServerError(err, From 43eb08dcf7d50e2b48d93d1e07f31f3064850f97 Mon Sep 17 00:00:00 2001 From: diana Date: Mon, 10 Jul 2023 13:06:27 +0300 Subject: [PATCH 5/5] Fix: Use LEFT JOIN instead of INNER JOIN Signed-off-by: diana --- backend/src/apiserver/storage/db.go | 2 +- backend/src/apiserver/storage/db_test.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/backend/src/apiserver/storage/db.go b/backend/src/apiserver/storage/db.go index 2eea820df63..9afd59e4b9f 100644 --- a/backend/src/apiserver/storage/db.go +++ b/backend/src/apiserver/storage/db.go @@ -93,7 +93,7 @@ func (d MySQLDialect) IsDuplicateError(err error) bool { // UpdateFromOrJoin TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available. func (d MySQLDialect) UpdateWithJointOrFrom(targetTable, joinTable, setClause, joinClause, whereClause string) string { - return fmt.Sprintf("UPDATE %s JOIN %s ON %s SET %s WHERE %s", targetTable, joinTable, joinClause, setClause, whereClause) + return fmt.Sprintf("UPDATE %s LEFT JOIN %s ON %s SET %s WHERE %s", targetTable, joinTable, joinClause, setClause, whereClause) } // SQLiteDialect implements SQLDialect with sqlite dialect implementation. diff --git a/backend/src/apiserver/storage/db_test.go b/backend/src/apiserver/storage/db_test.go index dbd35d35859..a89568b033c 100644 --- a/backend/src/apiserver/storage/db_test.go +++ b/backend/src/apiserver/storage/db_test.go @@ -120,7 +120,7 @@ func TestMySQLDialect_UpdateWithJointOrFrom(t *testing.T) { "State = ?", "target_table.Name = other_table.Name", "target_table.status = ?") - expectedQuery := `UPDATE target_table JOIN other_table ON target_table.Name = other_table.Name SET State = ? WHERE target_table.status = ?` + expectedQuery := `UPDATE target_table LEFT JOIN other_table ON target_table.Name = other_table.Name SET State = ? WHERE target_table.status = ?` assert.Equal(t, expectedQuery, actualQuery) }