diff --git a/backend/src/apiserver/storage/db.go b/backend/src/apiserver/storage/db.go index d15d66c72dc..9afd59e4b9f 100644 --- a/backend/src/apiserver/storage/db.go +++ b/backend/src/apiserver/storage/db.go @@ -59,6 +59,9 @@ type SQLDialect interface { // Inserts new rows and updates duplicates based on the key column. Upsert(query string, key string, overwrite bool, columns ...string) string + + // Updates a table using UPDATE with JOIN (mysql/production) or UPDATE FROM (sqlite/test). + UpdateWithJointOrFrom(targetTable, joinTable, setClause, joinClause, whereClause string) string } // MySQLDialect implements SQLDialect with mysql dialect implementation. @@ -88,6 +91,11 @@ func (d MySQLDialect) IsDuplicateError(err error) bool { return ok && sqlError.Number == mysqlerr.ER_DUP_ENTRY } +// UpdateFromOrJoin TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available. +func (d MySQLDialect) UpdateWithJointOrFrom(targetTable, joinTable, setClause, joinClause, whereClause string) string { + return fmt.Sprintf("UPDATE %s LEFT JOIN %s ON %s SET %s WHERE %s", targetTable, joinTable, joinClause, setClause, whereClause) +} + // SQLiteDialect implements SQLDialect with sqlite dialect implementation. type SQLiteDialect struct{} @@ -131,6 +139,11 @@ func (d SQLiteDialect) IsDuplicateError(err error) bool { return ok && sqlError.Code == sqlite3.ErrConstraint } +// UpdateFromOrJoin TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available. +func (d SQLiteDialect) UpdateWithJointOrFrom(targetTable, joinTable, setClause, joinClause, whereClause string) string { + return fmt.Sprintf("UPDATE %s SET %s FROM %s WHERE %s AND %s", targetTable, setClause, joinTable, joinClause, whereClause) +} + func NewMySQLDialect() MySQLDialect { return MySQLDialect{} } diff --git a/backend/src/apiserver/storage/db_test.go b/backend/src/apiserver/storage/db_test.go index 256ac4d263b..a89568b033c 100644 --- a/backend/src/apiserver/storage/db_test.go +++ b/backend/src/apiserver/storage/db_test.go @@ -103,11 +103,35 @@ func TestSQLiteDialect_Upsert(t *testing.T) { } func TestMySQLDialect_Upsert(t *testing.T) { - sqliteDialect := NewMySQLDialect() - actualQuery := sqliteDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", true, []string{"uuid", "name"}...) + mysqlDialect := NewMySQLDialect() + actualQuery := mysqlDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", true, []string{"uuid", "name"}...) expectedQuery := `insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow") ON DUPLICATE KEY UPDATE uuid=VALUES(uuid),name=VALUES(name)` assert.Equal(t, expectedQuery, actualQuery) - actualQuery2 := sqliteDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", false, []string{"uuid", "name"}...) + actualQuery2 := mysqlDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", false, []string{"uuid", "name"}...) expectedQuery2 := `insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow") ON DUPLICATE KEY UPDATE uuid=uuid,name=name` assert.Equal(t, expectedQuery2, actualQuery2) } + +func TestMySQLDialect_UpdateWithJointOrFrom(t *testing.T) { + mysqlDialect := NewMySQLDialect() + actualQuery := mysqlDialect.UpdateWithJointOrFrom( + "target_table", + "other_table", + "State = ?", + "target_table.Name = other_table.Name", + "target_table.status = ?") + expectedQuery := `UPDATE target_table LEFT JOIN other_table ON target_table.Name = other_table.Name SET State = ? WHERE target_table.status = ?` + assert.Equal(t, expectedQuery, actualQuery) +} + +func TestSQLiteDialect_UpdateWithJointOrFrom(t *testing.T) { + sqliteDialect := NewSQLiteDialect() + actualQuery := sqliteDialect.UpdateWithJointOrFrom( + "target_table", + "other_table", + "State = ?", + "target_table.Name = other_table.Name", + "target_table.status = ?") + expectedQuery := `UPDATE target_table SET State = ? FROM other_table WHERE target_table.Name = other_table.Name AND target_table.status = ?` + assert.Equal(t, expectedQuery, actualQuery) +} diff --git a/backend/src/apiserver/storage/experiment_store.go b/backend/src/apiserver/storage/experiment_store.go index d254537a28d..f394e2f9039 100644 --- a/backend/src/apiserver/storage/experiment_store.go +++ b/backend/src/apiserver/storage/experiment_store.go @@ -309,31 +309,14 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error { "Failed to create query to archive experiment %s. error: '%v'", expId, err.Error()) } - // TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available. - // TODO(jingzhang36): use inner join to replace nested query for better performance. - filteredRunsSql, filteredRunsArgs, err := sq.Select("ResourceUUID"). - From("resource_references as rf"). - Where(sq.And{ - sq.Eq{"rf.ResourceType": model.RunResourceType}, - sq.Eq{"rf.ReferenceUUID": expId}, - sq.Eq{"rf.ReferenceType": model.ExperimentResourceType}, - }).ToSql() - if err != nil { - return util.NewInternalServerError(err, - "Failed to create query to filter the runs in an experiment %s. error: '%v'", expId, err.Error()) - } - updateRunsSql, updateRunsArgs, err := sq. - Update("run_details"). - SetMap(sq.Eq{ - "StorageState": model.StorageStateArchived.ToString(), - }). - Where(sq.NotEq{"StorageState": model.StorageStateArchived.ToString()}). - Where(fmt.Sprintf("UUID in (%s) OR ExperimentUUID = '%s'", filteredRunsSql, expId), filteredRunsArgs...). - ToSql() - if err != nil { - return util.NewInternalServerError(err, - "Failed to create query to archive the runs in an experiment %s. error: '%v'", expId, err.Error()) - } + var updateRunsArgs []interface{} + updateRunsArgs = append(updateRunsArgs, model.StorageStateArchived.ToString(), model.RunResourceType, expId, model.ExperimentResourceType) + updateRunsSQL := s.db.UpdateWithJointOrFrom( + "run_details", + "resource_references", + "StorageState = ?", + "run_details.UUID = resource_references.ResourceUUID", + "resource_references.ResourceType = ? AND resource_references.ReferenceUUID = ? AND resource_references.ReferenceType = ?") updateRunsWithExperimentUUIDSql, updateRunsWithExperimentUUIDArgs, err := sq. Update("run_details"). @@ -348,32 +331,15 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error { "Failed to create query to archive the runs in an experiment %s. error: '%v'", expId, err.Error()) } - // TODO(jingzhang36): use inner join to replace nested query for better performance. - filteredJobsSql, filteredJobsArgs, err := sq.Select("ResourceUUID"). - From("resource_references as rf"). - Where(sq.And{ - sq.Eq{"rf.ResourceType": model.JobResourceType}, - sq.Eq{"rf.ReferenceUUID": expId}, - sq.Eq{"rf.ReferenceType": model.ExperimentResourceType}, - }).ToSql() - if err != nil { - return util.NewInternalServerError(err, - "Failed to create query to filter the jobs in an experiment %s. error: '%v'", expId, err.Error()) - } + var updateJobsArgs []interface{} now := s.time.Now().Unix() - updateJobsSql, updateJobsArgs, err := sq. - Update("jobs"). - SetMap(sq.Eq{ - "Enabled": false, - "UpdatedAtInSec": now, - }). - Where(sq.Eq{"Enabled": true}). - Where(fmt.Sprintf("UUID in (%s) OR ExperimentUUID = '%s'", filteredJobsSql, expId), filteredJobsArgs...). - ToSql() - if err != nil { - return util.NewInternalServerError(err, - "Failed to create query to archive the jobs in an experiment %s. error: '%v'", expId, err.Error()) - } + updateJobsArgs = append(updateJobsArgs, false, now, model.JobResourceType, expId, model.ExperimentResourceType) + updateJobsSQL := s.db.UpdateWithJointOrFrom( + "jobs", + "resource_references", + "Enabled = ?, UpdatedAtInSec = ?", + "jobs.UUID = resource_references.ResourceUUID", + "resource_references.ResourceType = ? AND resource_references.ReferenceUUID = ? AND resource_references.ReferenceType = ?") // In a single transaction, we update experiments, run_details and jobs tables. tx, err := s.db.Begin() @@ -388,7 +354,7 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error { "Failed to archive experiment %s. error: '%v'", expId, err.Error()) } - _, err = tx.Exec(updateRunsSql, updateRunsArgs...) + _, err = tx.Exec(updateRunsSQL, updateRunsArgs...) if err != nil { tx.Rollback() return util.NewInternalServerError(err, @@ -402,7 +368,7 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error { "Failed to archive runs with ExperimentUUID being %s. error: '%v'", expId, err.Error()) } - _, err = tx.Exec(updateJobsSql, updateJobsArgs...) + _, err = tx.Exec(updateJobsSQL, updateJobsArgs...) if err != nil { tx.Rollback() return util.NewInternalServerError(err,