Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(backend): Fix performance issue within a mysql request #9680

Merged
merged 5 commits into from
Jul 12, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions backend/src/apiserver/storage/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ type SQLDialect interface {

// Inserts new rows and updates duplicates based on the key column.
Upsert(query string, key string, overwrite bool, columns ...string) string

// Updates a table using UPDATE with JOIN (mysql/production) or UPDATE FROM (sqlite/test).
UpdateWithJointOrFrom(targetTable, joinTable, setClause, joinClause, whereClause string) string
}

// MySQLDialect implements SQLDialect with mysql dialect implementation.
Expand Down Expand Up @@ -88,6 +91,11 @@ func (d MySQLDialect) IsDuplicateError(err error) bool {
return ok && sqlError.Number == mysqlerr.ER_DUP_ENTRY
}

// UpdateFromOrJoin TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available.
func (d MySQLDialect) UpdateWithJointOrFrom(targetTable, joinTable, setClause, joinClause, whereClause string) string {
return fmt.Sprintf("UPDATE %s JOIN %s ON %s SET %s WHERE %s", targetTable, joinTable, joinClause, setClause, whereClause)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized that this actually should be LEFT JOIN because we are replacing the OR op in the WHERE clause with this.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep! Thanks for catching this!

Copy link
Member Author

@difince difince Jul 11, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-opening this conversation...
I just realized that the below statement is not correct.

I just realized that this actually should be LEFT JOIN because we are replacing the OR op in the WHERE clause with this.

This is the original request:
UPDATE run_details SET StorageState = 'ARCHIVED' WHERE StorageState <> 'ARCHIVED' AND UUID in (SELECT ResourceUUID FROM resource_references as rf WHERE (rf.ReferenceUUID = "d86e5315-09d7-4c8a-982e-4de004b2e4b6" AND rf.ResourceType = 'Run' AND rf.ReferenceType = 'Experiment')) OR ExperimentUUID = 'd86e5315-09d7-4c8a-982e-4de004b2e4b6';
created by the following code.

	// TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available.
	// TODO(jingzhang36): use inner join to replace nested query for better performance.
	filteredRunsSql, filteredRunsArgs, err := sq.Select("ResourceUUID").
		From("resource_references as rf").
		Where(sq.And{
			sq.Eq{"rf.ResourceType": model.RunResourceType},
			sq.Eq{"rf.ReferenceUUID": expId},
			sq.Eq{"rf.ReferenceType": model.ExperimentResourceType},
		}).ToSql()
	if err != nil {
		return util.NewInternalServerError(err,
			"Failed to create query to filter the runs in an experiment %s. error: '%v'", expId, err.Error())
	}
	updateRunsSql, updateRunsArgs, err := sq.
		Update("run_details").
		SetMap(sq.Eq{
			"StorageState": model.StorageStateArchived.ToString(),
		}).
		Where(sq.NotEq{"StorageState": model.StorageStateArchived.ToString()}).
		Where(fmt.Sprintf("UUID in (%s) OR ExperimentUUID = '%s'", filteredRunsSql, expId), filteredRunsArgs...).
		ToSql()
	if err != nil {
		return util.NewInternalServerError(err,
			"Failed to create query to archive the runs in an experiment %s. error: '%v'", expId, err.Error())
	}

This is the new optimized MySQL query:

UPDATE run_details JOIN resource_references ON run_details.UUID = resource_references.ResourceUUID SET StorageState = "STORAGESTATE_ARCHIVED" WHERE resource_references.ResourceType = "RUN" AND resource_references.ReferenceUUID = "<experiment_id>" AND resource_references.ReferenceType = "Experiment"

By adding LEFT JOIN we won't directly achieve OR ExperimentUUID = 'experiment_ID' from the original query, because:

  1. In general, LEFT JOIN intends to return the entire LEFT table (we just do not want this) and
  2. The WHERE clause of the new query is too specific: WHERE resource_references.ResourceType = "RUN" AND resource_references.ReferenceUUID = "<experiment_id>" AND resource_references.ReferenceType = "Experiment".
    Even if we add an OR ExperimentUUID = 'experiment_ID' statement it won't work because of how we JOIN both Tables. Plus there is already a dedicated query that updates all run_details based just on experimentUUID see (ref A) :
    updateRunsWithExperimentUUIDSql, updateRunsWithExperimentUUIDArgs, err := sq.
    Update("run_details").
    SetMap(sq.Eq{
    "StorageState": model.StorageStateArchived.ToString(),
    }).
    Where(sq.Eq{"ExperimentUUID": expId}).
    Where(sq.NotEq{"StorageState": model.StorageStateArchived.ToString()}).
    ToSql()
    if err != nil {

In short :), After I took a look at how Run and Jobs are created IMO, it would be a bug if we have in the database run_details record without a corresponding resource_reference record with ResourceType= RUN and ReferenceType=Experiment. In other words, Am I right that there shouldn't be existing Runs that don't belong to some Experiment?

  • So If this is the case, I suggest keeping the INNER JOIN version of the optimized request and even removing the (ref A) - IMO it has been added for "just in case"

  • If I am missing something and that is not the case - I would keep the optimized INNER JOIN request + ref A ( For Runs) and write appropriate ref B for Jobs.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @difince for a thorough analysis. I agree.

}
difince marked this conversation as resolved.
Show resolved Hide resolved

// SQLiteDialect implements SQLDialect with sqlite dialect implementation.
type SQLiteDialect struct{}

Expand Down Expand Up @@ -131,6 +139,11 @@ func (d SQLiteDialect) IsDuplicateError(err error) bool {
return ok && sqlError.Code == sqlite3.ErrConstraint
}

// UpdateFromOrJoin TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available.
func (d SQLiteDialect) UpdateWithJointOrFrom(targetTable, joinTable, setClause, joinClause, whereClause string) string {
return fmt.Sprintf("UPDATE %s SET %s FROM %s WHERE %s AND %s", targetTable, setClause, joinTable, joinClause, whereClause)
gkcalat marked this conversation as resolved.
Show resolved Hide resolved
}

func NewMySQLDialect() MySQLDialect {
return MySQLDialect{}
}
Expand Down
30 changes: 27 additions & 3 deletions backend/src/apiserver/storage/db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,35 @@ func TestSQLiteDialect_Upsert(t *testing.T) {
}

func TestMySQLDialect_Upsert(t *testing.T) {
sqliteDialect := NewMySQLDialect()
actualQuery := sqliteDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", true, []string{"uuid", "name"}...)
mysqlDialect := NewMySQLDialect()
difince marked this conversation as resolved.
Show resolved Hide resolved
actualQuery := mysqlDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", true, []string{"uuid", "name"}...)
expectedQuery := `insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow") ON DUPLICATE KEY UPDATE uuid=VALUES(uuid),name=VALUES(name)`
assert.Equal(t, expectedQuery, actualQuery)
actualQuery2 := sqliteDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", false, []string{"uuid", "name"}...)
actualQuery2 := mysqlDialect.Upsert(`insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow")`, "namespace", false, []string{"uuid", "name"}...)
expectedQuery2 := `insert into table (uuid, name, namespace) values ("a", "item1", "kubeflow"),("b", "item1", "kubeflow") ON DUPLICATE KEY UPDATE uuid=uuid,name=name`
assert.Equal(t, expectedQuery2, actualQuery2)
}

func TestMySQLDialect_UpdateWithJointOrFrom(t *testing.T) {
mysqlDialect := NewMySQLDialect()
actualQuery := mysqlDialect.UpdateWithJointOrFrom(
"target_table",
"other_table",
"State = ?",
"target_table.Name = other_table.Name",
"target_table.status = ?")
expectedQuery := `UPDATE target_table JOIN other_table ON target_table.Name = other_table.Name SET State = ? WHERE target_table.status = ?`
assert.Equal(t, expectedQuery, actualQuery)
}

func TestSQLiteDialect_UpdateWithJointOrFrom(t *testing.T) {
sqliteDialect := NewSQLiteDialect()
actualQuery := sqliteDialect.UpdateWithJointOrFrom(
"target_table",
"other_table",
"State = ?",
"target_table.Name = other_table.Name",
"target_table.status = ?")
expectedQuery := `UPDATE target_table SET State = ? FROM other_table WHERE target_table.Name = other_table.Name AND target_table.status = ?`
assert.Equal(t, expectedQuery, actualQuery)
}
36 changes: 9 additions & 27 deletions backend/src/apiserver/storage/experiment_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -309,32 +309,6 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error {
"Failed to create query to archive experiment %s. error: '%v'", expId, err.Error())
}

// TODO(gkcalat): deprecate resource_references table once we migration to v2beta1 is available.
// TODO(jingzhang36): use inner join to replace nested query for better performance.
filteredRunsSql, filteredRunsArgs, err := sq.Select("ResourceUUID").
From("resource_references as rf").
Where(sq.And{
sq.Eq{"rf.ResourceType": model.RunResourceType},
sq.Eq{"rf.ReferenceUUID": expId},
sq.Eq{"rf.ReferenceType": model.ExperimentResourceType},
}).ToSql()
if err != nil {
return util.NewInternalServerError(err,
"Failed to create query to filter the runs in an experiment %s. error: '%v'", expId, err.Error())
}
updateRunsSql, updateRunsArgs, err := sq.
Update("run_details").
SetMap(sq.Eq{
"StorageState": model.StorageStateArchived.ToString(),
}).
Where(sq.NotEq{"StorageState": model.StorageStateArchived.ToString()}).
Where(fmt.Sprintf("UUID in (%s) OR ExperimentUUID = '%s'", filteredRunsSql, expId), filteredRunsArgs...).
ToSql()
if err != nil {
return util.NewInternalServerError(err,
"Failed to create query to archive the runs in an experiment %s. error: '%v'", expId, err.Error())
}

gkcalat marked this conversation as resolved.
Show resolved Hide resolved
updateRunsWithExperimentUUIDSql, updateRunsWithExperimentUUIDArgs, err := sq.
Update("run_details").
SetMap(sq.Eq{
Expand Down Expand Up @@ -388,7 +362,15 @@ func (s *ExperimentStore) ArchiveExperiment(expId string) error {
"Failed to archive experiment %s. error: '%v'", expId, err.Error())
}

_, err = tx.Exec(updateRunsSql, updateRunsArgs...)
var arguments []interface{}
arguments = append(arguments, model.StorageStateArchived.ToString(), model.RunResourceType, expId, model.ExperimentResourceType)
query := s.db.UpdateWithJointOrFrom(
"run_details",
"resource_references",
"StorageState = ?",
"run_details.UUID = resource_references.ResourceUUID",
"resource_references.ResourceType = ? AND resource_references.ReferenceUUID = ? AND resource_references.ReferenceType = ?")
_, err = tx.Exec(query, arguments...)
gkcalat marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
tx.Rollback()
return util.NewInternalServerError(err,
Expand Down