-
Notifications
You must be signed in to change notification settings - Fork 317
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Only commit migration transaction if migration can be inserted into the DB #30
Changes from all commits
03676b1
14e8d15
2f43304
41db867
7afcbdc
de1a0e8
7010b20
1542240
77a90ff
9cc18c7
affde38
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -108,21 +108,12 @@ defmodule Ecto.Migrator do | |
end | ||
|
||
defp do_up(repo, version, module, opts) do | ||
run_maybe_in_transaction(repo, module, fn -> | ||
async_migrate_maybe_in_transaction(repo, version, module, :up, opts, fn -> | ||
attempt(repo, version, module, :forward, :up, :up, opts) | ||
|| attempt(repo, version, module, :forward, :change, :up, opts) | ||
|| {:error, Ecto.MigrationError.exception( | ||
"#{inspect module} does not implement a `up/0` or `change/0` function")} | ||
end) | ||
|> case do | ||
:ok -> | ||
verbose_schema_migration repo, "update schema migrations", fn -> | ||
SchemaMigration.up(repo, version, opts[:prefix]) | ||
end | ||
:ok | ||
error -> | ||
error | ||
end | ||
end | ||
|
||
@doc """ | ||
|
@@ -153,41 +144,65 @@ defmodule Ecto.Migrator do | |
end | ||
|
||
defp do_down(repo, version, module, opts) do | ||
run_maybe_in_transaction(repo, module, fn -> | ||
async_migrate_maybe_in_transaction(repo, version, module, :down, opts, fn -> | ||
attempt(repo, version, module, :forward, :down, :down, opts) | ||
|| attempt(repo, version, module, :backward, :change, :down, opts) | ||
|| {:error, Ecto.MigrationError.exception( | ||
"#{inspect module} does not implement a `down/0` or `change/0` function")} | ||
end) | ||
|> case do | ||
:ok -> | ||
end | ||
|
||
defp async_migrate_maybe_in_transaction(repo, version, module, direction, opts, fun) do | ||
parent = self() | ||
ref = make_ref() | ||
task = Task.async(fn -> run_maybe_in_transaction(parent, ref, repo, module, fun) end) | ||
|
||
if migrated_successfully?(ref, task.pid) do | ||
try do | ||
# The table with schema migrations can only be updated from | ||
# the parent process because it has a lock on the table | ||
verbose_schema_migration repo, "update schema migrations", fn -> | ||
SchemaMigration.down(repo, version, opts[:prefix]) | ||
apply(SchemaMigration, direction, [repo, version, opts[:prefix]]) | ||
end | ||
:ok | ||
error -> | ||
error | ||
catch | ||
kind, error -> | ||
Task.shutdown(task, :brutal_kill) | ||
:erlang.raise(kind, error, System.stacktrace()) | ||
end | ||
end | ||
|
||
send(task.pid, ref) | ||
Task.await(task, :infinity) | ||
end | ||
|
||
defp run_maybe_in_transaction(repo, module, fun) do | ||
fn -> do_run_maybe_in_transaction(repo, module, fun) end | ||
|> Task.async() | ||
|> Task.await(:infinity) | ||
defp migrated_successfully?(ref, pid) do | ||
receive do | ||
{^ref, :ok} -> true | ||
{^ref, _} -> false | ||
{:EXIT, ^pid, _} -> false | ||
end | ||
end | ||
|
||
defp do_run_maybe_in_transaction(repo, module, fun) do | ||
cond do | ||
module.__migration__[:disable_ddl_transaction] -> | ||
fun.() | ||
repo.__adapter__.supports_ddl_transaction? -> | ||
{:ok, result} = repo.transaction(fun, log: false, timeout: :infinity) | ||
result | ||
true -> | ||
fun.() | ||
defp run_maybe_in_transaction(parent, ref, repo, module, fun) do | ||
if module.__migration__[:disable_ddl_transaction] || | ||
not repo.__adapter__.supports_ddl_transaction? do | ||
send_and_receive(parent, ref, fun.()) | ||
else | ||
{:ok, result} = | ||
repo.transaction( | ||
fn -> send_and_receive(parent, ref, fun.()) end, | ||
log: false, timeout: :infinity | ||
) | ||
|
||
result | ||
end | ||
catch kind, reason -> | ||
{kind, reason, System.stacktrace} | ||
send_and_receive(parent, ref, {kind, reason, System.stacktrace}) | ||
end | ||
|
||
defp send_and_receive(parent, ref, value) do | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What do you think about naming this report_migration_result(parent, {ref, value})
wait_for_schema_migrations_update(ref, value) I'm trying to be more clear about why this "two-step" flow is required. With that goal in mind, the code above: try do
receive do
{^ref, :ok} ->
verbose_schema_migration repo, "update schema migrations", fn ->
apply(SchemaMigration, direction, [repo, version, opts[:prefix]])
end
{^ref, _} ->
:ok
{:EXIT, ^pid, _} ->
:ok
end
catch
kind, error ->
Task.shutdown(task, :brutal_kill)
:erlang.raise(kind, error, System.stacktrace())
else
_ ->
send(task.pid, ref)
Task.await(task, :infinity)
end could be abstracted to: parent = self()
result_ref = make_ref()
task = Task.async(fn -> run_maybe_in_transaction(parent, result_ref, repo, module, fun) end)
case wait_for_migration_result(task, result_ref) do
:ok ->
# The table with schema migrations can only be updated from the parent process
# because it has a lock acquired on that table.
report_schema_migrations_update(task, fn ->
verbose_schema_migration repo, "update schema migrations", fn ->
apply(SchemaMigration, direction, [repo, version, opts[:prefix]])
end
end)
_ ->
:ok
end)
Task.await(task, :infinity) Thoughts? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I like it. I pushed something similar, although I kept the try catch inline because I don't want to hide the task management into a bunch of functions and instead keep it all in one place. :) |
||
send parent, {ref, value} | ||
receive do: (^ref -> value) | ||
end | ||
|
||
defp attempt(repo, version, module, direction, operation, reference, opts) do | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an alternative approach to this, which is to use advisory locks.
I wrote a patch to Rails a couple of years ago that added this functionality to ActiveRecord. It avoids the problem of locking the schema_migrations table in a separate transaction.
That way you don't need two transactions at all, and can commit to the schema_migrations table directly from the main migration transaction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if we use advisory lock perhaps we can rollback the change that requires
pool_size: 2
for migrations? Ref: elixir-ecto/ecto#2258There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@samphilipd that's nice to hear. Can you dig a link to that commit/PR please?
Although I would wait before migrating to advisory locks because we need to support the current mechanism anyway for other databases.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@josevalim Both MySQL and Postgres support advisory locks. I am not sure about other databases, these are the only two that Rails guarantees safe concurrent migrations for AFAIK.
@wojtekmach it seems likely that yes, you will be able to do the migration with only one connection if you use advisory locks.
Take a look at migration.rb in ActiveRecord.
My original PR is here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the info! There is also a discussion here and here.