Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Only commit migration transaction if migration can be inserted into the DB #30

Merged
merged 11 commits into from
Nov 7, 2018
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,11 @@ otp_release:
- 20.2
addons:
apt:
sources:
- mysql-5.7-trusty
packages:
- mysql-server-5.6
- mysql-client-core-5.6
- mysql-client-5.6
- mysql-server
- mysql-client
before_install:
- sudo service postgresql stop
- sudo apt-get -y -qq --purge remove postgresql libpq-dev libpq5 postgresql-client-common postgresql-common
Expand All @@ -17,6 +18,9 @@ before_install:
- wget --quiet -O - http://apt.postgresql.org/pub/repos/apt/ACCC4CF8.asc | sudo apt-key add -
- sudo apt-get update -qq
- sudo apt-get -y -o Dpkg::Options::=--force-confdef -o Dpkg::Options::="--force-confnew" install postgresql-$PGVERSION postgresql-contrib-$PGVERSION
after_install:
- sudo mysql_upgrade -u root -p --force
- sudo service mysqld --full-restart
sudo: required
dist: trusty
cache: apt
Expand Down
71 changes: 58 additions & 13 deletions integration_test/sql/migrator.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ defmodule Ecto.Integration.MigratorTest do
use Ecto.Integration.Case

import Support.FileHelpers
import Ecto.Migrator, only: [migrated_versions: 1]
import ExUnit.CaptureLog

alias Ecto.Integration.PoolRepo
alias Ecto.Migration.SchemaMigration
Expand All @@ -14,15 +14,38 @@ defmodule Ecto.Integration.MigratorTest do
:ok
end

defmodule AnotherSchemaMigration do
use Ecto.Migration

def change do
execute PoolRepo.create_prefix("bad_schema_migrations"),
PoolRepo.drop_prefix("bad_schema_migrations")

create table(:schema_migrations, prefix: "bad_schema_migrations") do
add :version, :string
add :inserted_at, :integer
end
end
end

defmodule BrokenLinkMigration do
use Ecto.Migration

def change do
Task.start_link(fn -> raise "oops" end)
Process.sleep(:infinity)
end
end

defmodule GoodMigration do
use Ecto.Migration

def up do
:ok
create table(:good_migration)
end

def down do
:ok
drop table(:good_migration)
end
end

Expand All @@ -36,18 +59,14 @@ defmodule Ecto.Integration.MigratorTest do

import Ecto.Migrator
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor nitpick: should this be moved to the top where the line import Ecto.Migrator, only: [migrated_versions: 1] was?


test "schema migration" do
up(PoolRepo, 30, GoodMigration, log: false)

[migration] = PoolRepo.all(SchemaMigration)
assert migration.version == 30
assert migration.inserted_at
end

test "migrations up and down" do
assert migrated_versions(PoolRepo) == []
assert up(PoolRepo, 31, GoodMigration, log: false) == :ok

[migration] = PoolRepo.all(SchemaMigration)
assert migration.version == 31
assert migration.inserted_at

assert migrated_versions(PoolRepo) == [31]
assert up(PoolRepo, 31, GoodMigration, log: false) == :already_up
assert migrated_versions(PoolRepo) == [31]
Expand All @@ -57,10 +76,37 @@ defmodule Ecto.Integration.MigratorTest do
assert migrated_versions(PoolRepo) == []
end

test "bad migration" do
test "does not commit migration if insert into schema migration fails" do
# First we create a new schema migration table in another prefix
assert up(PoolRepo, 33, AnotherSchemaMigration, log: false) == :ok
assert migrated_versions(PoolRepo) == [33]

assert capture_log(fn ->
catch_error(up(PoolRepo, 34, GoodMigration, log: false, prefix: "bad_schema_migrations"))
catch_error(PoolRepo.all("good_migration"))
catch_error(PoolRepo.all("good_migration", prefix: "bad_schema_migrations"))
end) =~ "Could not update schema migrations"

assert down(PoolRepo, 33, AnotherSchemaMigration, log: false) == :ok
end

test "bad execute migration" do
assert catch_error(up(PoolRepo, 31, BadMigration, log: false))
end

test "broken link migration" do
Process.flag(:trap_exit, true)

assert capture_log(fn ->
{:ok, pid} = Task.start_link(fn -> up(PoolRepo, 31, BrokenLinkMigration, log: false) end)
assert_receive {:EXIT, ^pid, _}
end) =~ "oops"

assert capture_log(fn ->
catch_exit(up(PoolRepo, 31, BrokenLinkMigration, log: false))
end) =~ "oops"
end

test "run up to/step migration" do
in_tmp fn path ->
create_migration(47)
Expand Down Expand Up @@ -140,7 +186,6 @@ defmodule Ecto.Integration.MigratorTest do
defmodule #{module} do
use Ecto.Migration


def up do
update &[#{num}|&1]
end
Expand Down
83 changes: 49 additions & 34 deletions lib/ecto/migrator.ex
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,12 @@ defmodule Ecto.Migrator do
end

defp do_up(repo, version, module, opts) do
run_maybe_in_transaction(repo, module, fn ->
async_migrate_maybe_in_transaction(repo, version, module, :up, opts, fn ->
attempt(repo, version, module, :forward, :up, :up, opts)
|| attempt(repo, version, module, :forward, :change, :up, opts)
|| {:error, Ecto.MigrationError.exception(
"#{inspect module} does not implement a `up/0` or `change/0` function")}
end)
|> case do
:ok ->
verbose_schema_migration repo, "update schema migrations", fn ->
SchemaMigration.up(repo, version, opts[:prefix])
end
:ok
error ->
error
end
end

@doc """
Expand Down Expand Up @@ -153,41 +144,65 @@ defmodule Ecto.Migrator do
end

defp do_down(repo, version, module, opts) do
run_maybe_in_transaction(repo, module, fn ->
async_migrate_maybe_in_transaction(repo, version, module, :down, opts, fn ->
attempt(repo, version, module, :forward, :down, :down, opts)
|| attempt(repo, version, module, :backward, :change, :down, opts)
|| {:error, Ecto.MigrationError.exception(
"#{inspect module} does not implement a `down/0` or `change/0` function")}
end)
|> case do
:ok ->
verbose_schema_migration repo, "update schema migrations", fn ->
SchemaMigration.down(repo, version, opts[:prefix])
end
:ok
error ->
error
end
end

defp run_maybe_in_transaction(repo, module, fun) do
fn -> do_run_maybe_in_transaction(repo, module, fun) end
|> Task.async()
|> Task.await(:infinity)
defp async_migrate_maybe_in_transaction(repo, version, module, direction, opts, fun) do
parent = self()
ref = make_ref()

%{pid: pid} = task =
Task.async(fn -> run_maybe_in_transaction(parent, ref, repo, module, fun) end)

try do
receive do
{^ref, :ok} ->
verbose_schema_migration repo, "update schema migrations", fn ->
apply(SchemaMigration, direction, [repo, version, opts[:prefix]])
end

{^ref, _} ->
:ok

{:EXIT, ^pid, _} ->
:ok
end
catch
kind, error ->
Task.shutdown(task, :brutal_kill)
:erlang.raise(kind, error, System.stacktrace())
else
_ ->
send(task.pid, ref)
Task.await(task, :infinity)
end
end

defp do_run_maybe_in_transaction(repo, module, fun) do
cond do
module.__migration__[:disable_ddl_transaction] ->
fun.()
repo.__adapter__.supports_ddl_transaction? ->
{:ok, result} = repo.transaction(fun, log: false, timeout: :infinity)
result
true ->
fun.()
defp run_maybe_in_transaction(parent, ref, repo, module, fun) do
if module.__migration__[:disable_ddl_transaction] ||
not repo.__adapter__.supports_ddl_transaction? do
send_and_receive(parent, ref, fun.())
else
{:ok, result} =
repo.transaction(
fn -> send_and_receive(parent, ref, fun.()) end,
log: false, timeout: :infinity
)

result
end
catch kind, reason ->
{kind, reason, System.stacktrace}
send_and_receive(parent, ref, {kind, reason, System.stacktrace})
end

defp send_and_receive(parent, ref, value) do
Copy link
Contributor

@fertapric fertapric Nov 7, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What do you think about naming this report_and_wait_for_schema_migrations_update or similar? It can also be splitted into:

report_migration_result(parent, {ref, value})
wait_for_schema_migrations_update(ref, value)

I'm trying to be more clear about why this "two-step" flow is required. With that goal in mind, the code above:

try do
  receive do
    {^ref, :ok} ->
      verbose_schema_migration repo, "update schema migrations", fn ->
        apply(SchemaMigration, direction, [repo, version, opts[:prefix]])
      end
      
    {^ref, _} ->
      :ok
      
    {:EXIT, ^pid, _} ->
      :ok
  end
catch
  kind, error ->
    Task.shutdown(task, :brutal_kill)
    :erlang.raise(kind, error, System.stacktrace())
else
  _ ->
    send(task.pid, ref)
    Task.await(task, :infinity)
end

could be abstracted to:

parent = self()
result_ref = make_ref()

task = Task.async(fn -> run_maybe_in_transaction(parent, result_ref, repo, module, fun) end)

case wait_for_migration_result(task, result_ref) do
  :ok ->
    # The table with schema migrations can only be updated from the parent process
    # because it has a lock acquired on that table.
    report_schema_migrations_update(task, fn ->
      verbose_schema_migration repo, "update schema migrations", fn ->
        apply(SchemaMigration, direction, [repo, version, opts[:prefix]])
      end
    end)
  
  _ ->
    :ok
end)

Task.await(task, :infinity)

Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like it. I pushed something similar, although I kept the try catch inline because I don't want to hide the task management into a bunch of functions and instead keep it all in one place. :)

send parent, {ref, value}
receive do: (^ref -> value)
end

defp attempt(repo, version, module, direction, operation, reference, opts) do
Expand Down