Skip to content

Commit

Permalink
fix: fix case where batch before/after action callbacks could be skipped
Browse files Browse the repository at this point in the history
  • Loading branch information
zachdaniel committed Feb 22, 2025
1 parent 4cc61df commit 25b15d3
Show file tree
Hide file tree
Showing 7 changed files with 335 additions and 173 deletions.
70 changes: 38 additions & 32 deletions lib/ash/actions/create/bulk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ defmodule Ash.Actions.Create.Bulk do
opts[:tenant]
)

{batch, must_be_simple} =
{batch, must_be_simple_results} =
batch
|> Stream.map(fn changeset ->
Ash.Changeset.require_values(
Expand All @@ -457,43 +457,49 @@ defmodule Ash.Actions.Create.Bulk do
changeset.action.require_attributes
)
end)
|> Enum.reduce({[], []}, fn changeset, {batch, must_be_simple} ->
if changeset.around_transaction in [[], nil] and changeset.after_transaction in [[], nil] and
changeset.around_action in [[], nil] do
changeset = Ash.Changeset.run_before_transaction_hooks(changeset)
{[changeset | batch], must_be_simple}
else
{batch, [%{changeset | __validated_for_action__: action.name} | must_be_simple]}
end
end)

must_be_simple_results =
Enum.flat_map(must_be_simple, fn changeset ->
case Ash.Actions.Create.run(domain, changeset, action, opts) do
{:ok, result} ->
Process.put({:any_success?, ref}, true)
|> Ash.Actions.Helpers.split_and_run_simple(
action,
opts,
changes,
all_changes,
:bulk_create,
fn changeset ->
case Ash.Actions.Create.run(domain, changeset, action, opts) do
{:ok, result} ->
Process.put({:any_success?, ref}, true)

[
Ash.Resource.set_metadata(result, %{
bulk_create_index: changeset.context.bulk_create.index
})
]
[
Ash.Resource.set_metadata(result, %{
bulk_create_index: changeset.context.bulk_create.index
})
]

{:ok, result, notifications} ->
Process.put({:any_success?, ref}, true)
{:ok, result, notifications} ->
Process.put({:any_success?, ref}, true)

store_notification(ref, notifications, opts)
store_notification(ref, notifications, opts)

[
Ash.Resource.set_metadata(result, %{
bulk_create_index: changeset.context.bulk_create.index
})
]
[
Ash.Resource.set_metadata(result, %{
bulk_create_index: changeset.context.bulk_create.index
})
]

{:error, error} ->
store_error(ref, error, opts)
[]
{:error, error} ->
store_error(ref, error, opts)
[]
end
end
)

batch =
Enum.reject(batch, fn
%{valid?: false} = changeset ->
store_error(ref, changeset, opts)
true

_changeset ->
false
end)

if opts[:transaction] == :batch &&
Expand Down
106 changes: 52 additions & 54 deletions lib/ash/actions/destroy/bulk.ex
Original file line number Diff line number Diff line change
Expand Up @@ -267,15 +267,6 @@ defmodule Ash.Actions.Destroy.Bulk do
end
)

context_key =
case atomic_changeset.action.type do
:update ->
:bulk_update

:destroy ->
:bulk_destroy
end

if (has_after_batch_hooks? || !Enum.empty?(atomic_changeset.after_action)) &&
Keyword.get(opts, :transaction, true) do
Ash.DataLayer.transaction(
Expand All @@ -291,7 +282,7 @@ defmodule Ash.Actions.Destroy.Bulk do
end,
opts[:timeout],
%{
type: context_key,
type: :bulk_destroy,
metadata: %{
resource: query.resource,
action: atomic_changeset.action.name,
Expand Down Expand Up @@ -1367,61 +1358,68 @@ defmodule Ash.Actions.Destroy.Bulk do
:bulk_destroy
)

{batch, must_be_simple} =
Enum.reduce(batch, {[], []}, fn changeset, {batch, must_be_simple} ->
if changeset.around_transaction in [[], nil] and changeset.after_transaction in [[], nil] and
changeset.around_action in [[], nil] do
changeset = Ash.Changeset.run_before_transaction_hooks(changeset)
{[changeset | batch], must_be_simple}
else
{batch, [%{changeset | __validated_for_action__: action.name} | must_be_simple]}
end
end)

must_be_simple_results =
Enum.flat_map(must_be_simple, fn changeset ->
case Ash.Actions.Destroy.run(
domain,
changeset,
action,
Keyword.put(opts, :return_destroyed?, opts[:return_records?])
) do
:ok ->
Process.put({:any_success?, ref}, true)
{batch, must_be_simple_results} =
Ash.Actions.Helpers.split_and_run_simple(
batch,
action,
opts,
changes,
all_changes,
:bulk_destroy,
fn changeset ->
case Ash.Actions.Destroy.run(
domain,
changeset,
action,
Keyword.put(opts, :return_destroyed?, opts[:return_records?])
) do
:ok ->
Process.put({:any_success?, ref}, true)

[]
[]

{:ok, result} when not is_list(result) ->
Process.put({:any_success?, ref}, true)
{:ok, result} when not is_list(result) ->
Process.put({:any_success?, ref}, true)

[
Ash.Resource.set_metadata(result, %{
bulk_destroy_index: changeset.context.bulk_destroy.index
})
]
[
Ash.Resource.set_metadata(result, %{
bulk_destroy_index: changeset.context.bulk_destroy.index
})
]

{:ok, notifications} ->
Process.put({:any_success?, ref}, true)
{:ok, notifications} ->
Process.put({:any_success?, ref}, true)

store_notification(ref, notifications, opts)
store_notification(ref, notifications, opts)

[]
[]

{:ok, result, notifications} ->
Process.put({:any_success?, ref}, true)
{:ok, result, notifications} ->
Process.put({:any_success?, ref}, true)

store_notification(ref, notifications, opts)
store_notification(ref, notifications, opts)

[
Ash.Resource.set_metadata(result, %{
bulk_destroy_index: changeset.context.bulk_destroy.index
})
]
[
Ash.Resource.set_metadata(result, %{
bulk_destroy_index: changeset.context.bulk_destroy.index
})
]

{:error, error} ->
store_error(ref, error, opts)
[]
{:error, error} ->
store_error(ref, error, opts)
[]
end
end
)

batch =
Enum.reject(batch, fn
%{valid?: false} = changeset ->
store_error(ref, changeset, opts)
true

_changeset ->
false
end)

if opts[:transaction] == :batch &&
Expand Down
113 changes: 113 additions & 0 deletions lib/ash/actions/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,119 @@ defmodule Ash.Actions.Helpers do
require Logger
require Ash.Flags

def split_and_run_simple(batch, action, opts, changes, all_changes, context_key, callback) do
{batch, must_be_simple} =
Enum.reduce(batch, {[], []}, fn changeset, {batch, must_be_simple} ->
if changeset.around_transaction in [[], nil] and changeset.after_transaction in [[], nil] and
changeset.around_action in [[], nil] do
changeset = Ash.Changeset.run_before_transaction_hooks(changeset)
{[changeset | batch], must_be_simple}
else
{batch, [%{changeset | __validated_for_action__: action.name} | must_be_simple]}
end
end)

context =
struct(
Ash.Resource.Change.Context,
%{
bulk?: true,
actor: opts[:actor],
tenant: opts[:tenant],
tracer: opts[:tracer],
authorize?: opts[:authorize?]
}
)

must_be_simple_results =
Enum.flat_map(must_be_simple, fn changeset ->
changeset =
all_changes
|> Enum.flat_map(fn
{%{change: {mod, change_opts}} = change, change_index} ->
index = changeset.context |> Map.get(context_key) |> Map.get(:index)

applicable = changes[change_index]

if applicable == :all || index in applicable do
change_opts =
Ash.Actions.Helpers.templated_opts(
change_opts,
opts[:actor],
changeset.arguments,
changeset.context
)

if mod.batch_callbacks?([changeset], change_opts, context) do
[%{change | change: {mod, change_opts}}]
else
[]
end
else
[]
end

_ ->
[]
end)
|> Enum.reduce(changeset, fn %{change: {mod, change_opts}}, changeset ->
changeset =
if mod.has_after_batch?() do
Ash.Changeset.after_action(changeset, fn changeset, result ->
case mod.after_batch([{changeset, result}], change_opts, context) do
:ok ->
{:ok, result}

enumerable ->
Enum.reduce_while(
enumerable,
{{:ok, result}, []},
fn
%Ash.Notifier.Notification{} = notification, {res, notifications} ->
{:cont, {res, [notification | notifications]}}

{:error, error}, {_res, notifications} ->
{:halt, {{:error, error}, notifications}}

{:ok, result}, {_res, notifications} ->
{:cont, {{:ok, result}, notifications}}
end
)
end
|> case do
{{:ok, res}, notifications} -> {:ok, res, notifications}
{other, _} -> other
end
end)
else
changeset
end

if mod.has_before_batch?() do
Ash.Changeset.before_action(changeset, fn changeset ->
mod.before_batch([changeset], change_opts, context)
|> Enum.reduce(
{changeset, []},
fn
%Ash.Notifier.Notification{} = notification, {changeset, notifications} ->
{changeset, [notification | notifications]}

changeset, {_, notifications} ->
{changeset, notifications}
end
)
end)
else
changeset
end
end)

callback.(changeset)
end)

{batch, must_be_simple_results}
end

def rollback_if_in_transaction(
{:error, %Ash.Error.Changes.StaleRecord{} = error},
_resource,
Expand Down
Loading

0 comments on commit 25b15d3

Please sign in to comment.