-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP -- Working to support migration-scoped queries from VTGate #2606
Conversation
go/vt/tabletserver/query_executor.go
Outdated
@@ -295,25 +295,38 @@ func (qre *QueryExecutor) execDDL() (*sqltypes.Result, error) { | |||
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "DDL is not understood") | |||
} | |||
|
|||
if qre.transactionID != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Liquibase disables autocommit so that it can handle some changesets such as adding a not-null constraint with a default value. This particular changeset results in 2 queries: 1 to update the existing values, and 1 to add the constraint. These should happen in a single transaction. Liquibase disables setAutoCommit, so the JDBC does a BEGIN to start the transaction. This works through vtgate, but when vttablet sees it it starts its own transaction because it's a DDL. The two transactions actually deadlock each other in this case.
The downside of this change is that for these multi-query transactions we no longer call schema.Engine.Reload()
. Trying to think of ways around that. Can we mark somewhere (in the session?) to reload the schema on commit?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I ended up having to fix this. I went ahead and added a marker on the TxConnection -- see ReloadSchemaOnCommit
. Not sure if you have a different approach you'd prefer @sougou
53963f5
to
c35a4e6
Compare
…onflicts with force_eof
…s an NPE on Table.Name, since creates are the only one to only have a NewName
…yntax in the test failed to parse and was incorrect according to docs
c35a4e6
to
ea437b5
Compare
@sougou if you can take a look when you have a chance. This is working for me now, I was able to run a number of migrations through this. I commented on one area we should specifically look at. Another gotcha I'd like to tackle is: Even though I'm using ExecuteShards, I had to turn strict mode off. I'd like to have an in-between mode. Everything else other than migrations should be treated as strict. We shouldn't be doing sharding or filtered replication while running a migration. I have a separate branch for supporting ExecuteShards in the JDBC client. I also am not totally sure what tests to add, but will look to add some. |
I'm not sure what's up with travis. The endtoend tests fail, looks like they fail trying to create a json column in mariadb, with maria returning a syntax error. Running them locally works, but they've consistently failed in travis. |
Are sure that's the problem? There's a hanging transaction and then a stacktrace gets printed? Hanging may be here?
|
Hmm not totally sure how to grok those various stacktraces around the hang. I'm guessing it could be any one of them? I'm not able to reproduce locally, but I did make changes to the DDL transactions in query_executor.go. So will try to see if I can reproduce or narrow it down. |
Not sure why I wasn't triggering the failure locally, but I pushed a fix and Travis is happy again! |
Travis usually has higher load and therefore it sometimes exposes races which don't show up on a beefy, idle workstation. Some suggestions to reproduce such a problem:
|
go/vt/sqlparser/ast.go
Outdated
// Lowered returns a new TableName where the Name and Qualifier have been converted | ||
// to lower case | ||
func (node *TableName) Lowered() *TableName { | ||
return &TableName{Name: node.Name.Lowered(), Qualifier: node.Qualifier.Lowered()} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks like you're using this for lower-casing view names. If so, Qualifier
should not be lowered. So, you may have to also rename this function to ToViewName
.
go/vt/sqlparser/ast.go
Outdated
@@ -2065,6 +2071,12 @@ func (node *TableIdent) UnmarshalJSON(b []byte) error { | |||
return nil | |||
} | |||
|
|||
// Lowered returns a new TableIdent where the backing string has been | |||
// converted to lowercase | |||
func (node TableIdent) Lowered() TableIdent { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this function should generally not be used from outside this package, you could make it private, or even get rid of it and do the conversion directly in ToViewName
.
@@ -329,19 +329,6 @@ func (se *Engine) TableWasCreatedOrAltered(ctx context.Context, tableName string | |||
return nil | |||
} | |||
|
|||
// TableWasDropped must be called if a table was dropped. | |||
func (se *Engine) TableWasDropped(tableName sqlparser.TableIdent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Need to delete TableWasCreatedOrAltered
also.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TableWasCreatedOrAltered
is used by Reload. I can still remove from that function if you'd like. But I wasn't sure of the implications.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like it plays a pretty big role in Reload
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot about that. This is ok then.
@@ -128,11 +128,17 @@ func TestDDLPlan(t *testing.T) { | |||
t.Fatalf("Error marshalling %v", plan) | |||
} | |||
matchString(t, tcase.lineno, expected["Action"], plan.Action) | |||
matchString(t, tcase.lineno, expected["TableName"], plan.TableName.String()) | |||
matchString(t, tcase.lineno, expected["NewName"], plan.NewName.String()) | |||
matchString(t, tcase.lineno, expected["TableName"], renderTableName(plan.TableName)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use sqlparser.String
instead.
go/vt/tabletserver/query_executor.go
Outdated
@@ -295,25 +295,33 @@ func (qre *QueryExecutor) execDDL() (*sqltypes.Result, error) { | |||
return nil, vterrors.Errorf(vtrpcpb.Code_INVALID_ARGUMENT, "DDL is not understood") | |||
} | |||
|
|||
conn, err := qre.tsv.te.txPool.LocalBegin(qre.ctx) | |||
if qre.transactionID != 0 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should probably fail here instead. DDLs have an implicit commit in them. So, this will mess up our internal transaction state.
The other option is that we end the transaction here with our own commit. This will at least cause future DMLs against the transaction to fail, which is the right behavior.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you explain a little more about why you think we should do that? I'm not sure if my original use-case was lost in the commit stream, but here's what I see from my side:
Liquibase (a major java schema migration manager) converts a user-specified "changeset" into a bundle of queries. In many cases that results in just 1 query, like an ALTER/CREATE/etc. But for some changesets, that can be multiple queries. Here's an example:
<changeSet id="Populate initial values and make not null" author="bbeaudreault" context="job">
<addNotNullConstraint tableName="todo" columnName="list_id" defaultNullValue="1" columnDataType="int"/>
</changeSet>
This is a reasonably common example: Adding a not-null constraint on a column, and first setting the default value on all existing rows. The way to handle this is to do 2 queries, wrapped in a transaction:
2017-03-02T20:34:26.971972Z 40 Query begin
2017-03-02T20:34:26.972148Z 40 Query update vttest.todo set list_id = '1' where list_id is null/* vtgate:: filtered_replication_unfriendly */
2017-03-02T20:34:27.004152Z 40 Query ALTER TABLE vttest.todo MODIFY list_id INT NOT NULL
2017-03-02T20:34:27.061314Z 40 Query commit
The above works great with the changes in this PR. However, previously the queries would come in like this:
begin
update vttest.todo set list_id = '1' where list_id is null/* vtgate:: filtered_replication_unfriendly */
begin
ALTER TABLE vttest.todo MODIFY list_id INT NOT NULL
--- unreachable:
commit
commit
You may be right that all DDLs have an implicit commit -- is that a MySQL thing, or a vitess thing? If Vitess, is there a reason that's the case? Some other way we can do it?
The way I see it is this is a reasonable use-case, and one used by a major tool that is very popular in the java community. So there should be a way to support it, not throw an exception.
One option, which would solve this case but not sure about all other changesets: We can still do the implicit commit, but only call LocalBegin if we're not already in a transaction.
Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see at https://dev.mysql.com/doc/refman/5.7/en/implicit-commit.html that it is a MySQL restriction. Since other places in vitess can create transactions, i.e. BeginExecute, should I instead just commit any active transaction and let the rest of the function work as it used to?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry @sougou I think I misread your comment/suggestion based on my lack of experience with this. I am going to try to implement your second suggestion, which is to commit any running transaction before starting the new one.
go/vt/tabletserver/query_executor.go
Outdated
return result, nil | ||
} | ||
|
||
result, err := qre.execAsTransaction(func(conn *TxConnection) (*sqltypes.Result, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Don't know why the other code was hanging, but this is the right thing to do anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it was because I was not doing a commit or rollback on error -- prior to this PR we were deferring the LocalCommit, so it always happened even on failure. I had changed it to an explicit LocalCommit on success. Something with the extra load on travis was causing execSql to fail and the commit never happened and the connection never returned to the pool. Using execAsTransaction properly handles cleanup now. That's my guess at least.
go/vt/tabletserver/tx_pool.go
Outdated
@@ -171,12 +172,24 @@ func (axp *TxPool) Begin(ctx context.Context) (int64, error) { | |||
} | |||
|
|||
// Commit commits the specified transaction. | |||
func (axp *TxPool) Commit(ctx context.Context, transactionID int64, messager *MessagerEngine) error { | |||
func (axp *TxPool) Commit(ctx context.Context, transactionID int64, messager *MessagerEngine, schemaEngine *schema.Engine) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change shouldn't be needed. We should commit & reload schema in QueryExecutor instead (if we still want to allow this).
… -- DDL's have an implicit commit.
I've pushed a version where we close the open transaction. I'm going to do some internal testing to see how that goes. I am mildly concerned that the transactionIDs will somehow be impacted in vtgate and the JDBC driver, but we'll see. |
The new approach does not work, sort of predictably on reflection. Liquibase tries to call commit at the end, and the transaction does not exist anymore. So vttablet returns:
I've noticed that |
The connections used by vitess all have autocommit=ON. This is required for connection pools. We have three options:
|
I've implemented an initial solution using option 2. I'm testing it out internally while travis runs, to see if this works for the Liquibase use-case. |
@sougou this worked for me. If you're ok with the approach I will add tests to ensure we are not leaking transactions or anything for the defined use-case. I've run a 30 or so migrations through this and see no leaked transactions (based on no |
go/vt/tabletserver/query_executor.go
Outdated
return nil, err | ||
} | ||
err = qre.tsv.te.txPool.LocalCommit(qre.ctx, conn, qre.tsv.messager) | ||
err := qre.tsv.te.txPool.Commit(qre.ctx, qre.transactionID, qre.tsv.messager) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you can rely on execAsTransaction
for this code path because it starts its own transaction. You have to perform your own qre.execSQL
here. So, the flow should be:
txPool.Get
defer conn.Recycle
qre.execSQL
conn.BeginAgain (this is a method of TxConn)
return (skip execAsTransaction)
This way, the current TxConn retains its transaction id and it remains in the active pool. So, the subsequent commit from the app will be honored.
go/vt/tabletserver/tx_pool.go
Outdated
@@ -141,6 +141,22 @@ func (axp *TxPool) WaitForEmpty() { | |||
// Begin begins a transaction, and returns the associated transaction id. | |||
// Subsequent statements can access the connection through the transaction id. | |||
func (axp *TxPool) Begin(ctx context.Context) (int64, error) { | |||
return axp.internalBegin(ctx, func() int64 { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TxPool need not change. Instead, add BeginAgain to TxConn. Make it execute a commit
and begin
. The commit ensures that anything accidentally uncommitted will not get lost.
Thanks for the feedback. Made changes based on suggestion. Testing in my environment now |
go/vt/sqlparser/ast.go
Outdated
func (node *TableName) ToViewName() *TableName { | ||
return &TableName{ | ||
Name: NewTableIdent(strings.ToLower(node.Name.v)), | ||
Qualifier: NewTableIdent(strings.ToLower(node.Qualifier.v)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One last nit. I think this should not be lowered because qualifiers are case-sensitive. That's what I was trying to say in my earlier comment.
@@ -329,19 +329,6 @@ func (se *Engine) TableWasCreatedOrAltered(ctx context.Context, tableName string | |||
return nil | |||
} | |||
|
|||
// TableWasDropped must be called if a table was dropped. | |||
func (se *Engine) TableWasDropped(tableName sqlparser.TableIdent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot about that. This is ok then.
Thanks! |
I saw travis failed after we merged somehow. I just kicked off another build and if it continues to fail I'll try to fix in the morning |
Everything looks good on re-run |
This will be WIP for a few weeks, but I wanted to start having travis build/test it now as I do some testing internally. I'll ping for a review when ready.
The goal here is to provide basic initial support for running migrations through VTGate. For now I am using ExecuteShards. In the future we use a different fronting endpoint, and can unify with schemamanger/schema_swap. But first step is to be able to route them through VTGate at all.
I'll be adding to this PR as I encounter blockers during my internal tests.