Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TECH-41: Implements usage of new limiter's batch API functions #93

Merged
merged 3 commits into from
Nov 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions apps/ff_cth/src/ct_payment_system.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
provider_identity_id => id(),
dummy_provider_identity_id => id(),
optional_apps => list(),
setup_dominant => fun((config()) -> ok)
setup_dominant => fun((config()) -> config())
}.

-opaque system() :: #{
Expand Down Expand Up @@ -62,10 +62,10 @@ do_setup(Options0, C0) ->
{ok, Processing0} = start_processing_apps(Options),
C1 = ct_helper:makeup_cfg([ct_helper:woody_ctx()], [{services, services(Options)} | C0]),
ok = ct_helper:set_context(C1),
ok = setup_dominant(C1, Options),
C2 = setup_dominant(C1, Options),
ok = configure_processing_apps(Options),
ok = ct_helper:unset_context(),
[{payment_system, Processing0} | C1].
[{payment_system, Processing0} | C2].

start_processing_apps(Options) ->
{StartedApps, _StartupCtx} = ct_helper:start_apps([
Expand Down Expand Up @@ -139,18 +139,18 @@ start_optional_apps(#{optional_apps := Apps}) ->
start_optional_apps(_) ->
[].

setup_dominant(Config, Options) ->
ok = setup_dominant_internal(Config, Options),
setup_dominant(Config0, Options) ->
Config1 = setup_dominant_internal(Config0, Options),
DomainConfig = domain_config(Options),
_ = ct_domain_config:upsert(DomainConfig),
DomainConfigUpdate = domain_config_add_version(Options),
_ = ct_domain_config:upsert(DomainConfigUpdate),
ok.
Config1.

setup_dominant_internal(Config, #{setup_dominant := Func}) when is_function(Func, 1) ->
Func(Config);
setup_dominant_internal(_Config, _Options) ->
ok.
setup_dominant_internal(Config, _Options) ->
Config.

configure_processing_apps(Options) ->
ok = set_app_env(
Expand Down
197 changes: 171 additions & 26 deletions apps/ff_transfer/src/ff_limiter.erl
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@
-type limit_amount() :: dmsl_domain_thrift:'Amount'().
-type context() :: limproto_limiter_thrift:'LimitContext'().
-type clock() :: limproto_limiter_thrift:'Clock'().
-type request() :: limproto_limiter_thrift:'LimitRequest'().

-export([get_turnover_limits/1]).
-export([check_limits/3]).
-export([check_limits/4]).
-export([marshal_withdrawal/1]).

-export([hold_withdrawal_limits/4]).
Expand All @@ -38,53 +39,158 @@ get_turnover_limits({value, Limits}) ->
get_turnover_limits(Ambiguous) ->
error({misconfiguration, {'Could not reduce selector to a value', Ambiguous}}).

-spec check_limits([turnover_limit()], route(), withdrawal()) ->
-spec check_limits([turnover_limit()], withdrawal(), route(), pos_integer()) ->
{ok, [limit()]}
| {error, {overflow, [{limit_id(), limit_amount(), turnover_limit_upper_boundary()}]}}.
check_limits(TurnoverLimits, Route, Withdrawal) ->
check_limits(TurnoverLimits, Withdrawal, Route, Iter) ->
Clock = get_latest_clock(),
Context = gen_limit_context(Route, Withdrawal),
case lists:foldl(fun(Limit, Acc) -> check_limits_(Limit, Acc, Context) end, {[], []}, TurnoverLimits) of
LimitValues = collect_limit_values(
Clock, Context, TurnoverLimits, make_operation_segments(Withdrawal, Route, Iter)
),
case lists:foldl(fun(LimitValue, Acc) -> check_limits_(LimitValue, Acc) end, {[], []}, LimitValues) of
{Limits, ErrorList} when length(ErrorList) =:= 0 ->
{ok, Limits};
{_, ErrorList} ->
{error, {overflow, ErrorList}}
end.

check_limits_(T, {Limits, Errors}, Context) ->
#domain_TurnoverLimit{id = LimitID, domain_revision = DomainRevision} = T,
Clock = get_latest_clock(),
Limit = get(LimitID, DomainRevision, Clock, Context),
#limiter_Limit{
amount = LimitAmount
} = Limit,
UpperBoundary = T#domain_TurnoverLimit.upper_boundary,
make_operation_segments(Withdrawal, _Route = #{terminal_id := TerminalID, provider_id := ProviderID}, Iter) ->
[
genlib:to_binary(ProviderID),
genlib:to_binary(TerminalID),
ff_withdrawal:id(Withdrawal)
| case Iter of
1 -> [];
N when N > 1 -> [genlib:to_binary(Iter)]
end
].

collect_limit_values(Clock, Context, TurnoverLimits, OperationIdSegments) ->
{LegacyTurnoverLimits, BatchTurnoverLimits} = split_turnover_limits_by_available_limiter_api(TurnoverLimits),
get_legacy_limit_values(Clock, Context, LegacyTurnoverLimits) ++
get_batch_limit_values(Context, BatchTurnoverLimits, OperationIdSegments).

get_legacy_limit_values(Clock, Context, TurnoverLimits) ->
lists:foldl(
fun(TurnoverLimit, Acc) ->
#domain_TurnoverLimit{id = LimitID, domain_revision = DomainRevision, upper_boundary = UpperBoundary} =
TurnoverLimit,
Limit = get(LimitID, DomainRevision, Clock, Context),
LimitValue = #{
id => LimitID,
boundary => UpperBoundary,
limit => Limit
},
[LimitValue | Acc]
end,
[],
TurnoverLimits
).

get_batch_limit_values(_Context, [], _OperationIdSegments) ->
[];
get_batch_limit_values(Context, TurnoverLimits, OperationIdSegments) ->
{LimitRequest, TurnoverLimitsMap} = prepare_limit_request(TurnoverLimits, OperationIdSegments),
lists:map(
fun(Limit = #limiter_Limit{id = LimitID}) ->
#domain_TurnoverLimit{upper_boundary = UpperBoundary} = maps:get(LimitID, TurnoverLimitsMap),
#{
id => LimitID,
boundary => UpperBoundary,
limit => Limit
}
end,
get_batch(LimitRequest, Context)
).

check_limits_(#{id := LimitID, boundary := UpperBoundary, limit := Limit}, {Limits, Errors}) ->
#limiter_Limit{amount = LimitAmount} = Limit,
case LimitAmount =< UpperBoundary of
true ->
{[Limit | Limits], Errors};
false ->
{Limits, [{LimitID, LimitAmount, UpperBoundary} | Errors]}
end.

-spec hold_withdrawal_limits([turnover_limit()], route(), withdrawal(), pos_integer()) -> ok | no_return().
hold_withdrawal_limits(TurnoverLimits, Route, Withdrawal, Iter) ->
LimitChanges = gen_limit_changes(TurnoverLimits, Route, Withdrawal, Iter),
-spec hold_withdrawal_limits([turnover_limit()], withdrawal(), route(), pos_integer()) -> ok | no_return().
hold_withdrawal_limits(TurnoverLimits, Withdrawal, Route, Iter) ->
Context = gen_limit_context(Route, Withdrawal),
hold(LimitChanges, get_latest_clock(), Context).
{LegacyTurnoverLimits, BatchTurnoverLimits} = split_turnover_limits_by_available_limiter_api(TurnoverLimits),
ok = legacy_hold_withdrawal_limits(Context, LegacyTurnoverLimits, Withdrawal, Route, Iter),
ok = batch_hold_limits(Context, BatchTurnoverLimits, make_operation_segments(Withdrawal, Route, Iter)).

-spec commit_withdrawal_limits([turnover_limit()], route(), withdrawal(), pos_integer()) -> ok.
commit_withdrawal_limits(TurnoverLimits, Route, Withdrawal, Iter) ->
legacy_hold_withdrawal_limits(Context, TurnoverLimits, Withdrawal, Route, Iter) ->
LimitChanges = gen_limit_changes(TurnoverLimits, Route, Withdrawal, Iter),
hold(LimitChanges, get_latest_clock(), Context).

batch_hold_limits(_Context, [], _OperationIdSegments) ->
ok;
batch_hold_limits(Context, TurnoverLimits, OperationIdSegments) ->
{LimitRequest, _} = prepare_limit_request(TurnoverLimits, OperationIdSegments),
_ = hold_batch(LimitRequest, Context),
ok.

-spec commit_withdrawal_limits([turnover_limit()], withdrawal(), route(), pos_integer()) -> ok.
commit_withdrawal_limits(TurnoverLimits, Withdrawal, Route, Iter) ->
Context = gen_limit_context(Route, Withdrawal),
{LegacyTurnoverLimits, BatchTurnoverLimits} = split_turnover_limits_by_available_limiter_api(TurnoverLimits),
Clock = get_latest_clock(),
ok = commit(LimitChanges, Clock, Context),
ok = log_limit_changes(TurnoverLimits, Clock, Context).
ok = legacy_commit_withdrawal_limits(Context, LegacyTurnoverLimits, Withdrawal, Route, Iter),
OperationIdSegments = make_operation_segments(Withdrawal, Route, Iter),
ok = batch_commit_limits(Context, BatchTurnoverLimits, OperationIdSegments),
ok = log_limit_changes(TurnoverLimits, Clock, Context, Withdrawal, Route, Iter).

-spec rollback_withdrawal_limits([turnover_limit()], route(), withdrawal(), pos_integer()) -> ok.
rollback_withdrawal_limits(TurnoverLimits, Route, Withdrawal, Iter) ->
legacy_commit_withdrawal_limits(Context, TurnoverLimits, Withdrawal, Route, Iter) ->
LimitChanges = gen_limit_changes(TurnoverLimits, Route, Withdrawal, Iter),
Clock = get_latest_clock(),
ok = commit(LimitChanges, Clock, Context).

batch_commit_limits(_Context, [], _OperationIdSegments) ->
ok;
batch_commit_limits(Context, TurnoverLimits, OperationIdSegments) ->
{LimitRequest, _} = prepare_limit_request(TurnoverLimits, OperationIdSegments),
_ = commit_batch(LimitRequest, Context),
ok.

-spec rollback_withdrawal_limits([turnover_limit()], withdrawal(), route(), pos_integer()) -> ok.
rollback_withdrawal_limits(TurnoverLimits, Withdrawal, Route, Iter) ->
Context = gen_limit_context(Route, Withdrawal),
{LegacyTurnoverLimits, BatchTurnoverLimits} = split_turnover_limits_by_available_limiter_api(TurnoverLimits),
ok = legacy_rollback_withdrawal_limits(Context, LegacyTurnoverLimits, Withdrawal, Route, Iter),
OperationIdSegments = make_operation_segments(Withdrawal, Route, Iter),
ok = batch_rollback_limits(Context, BatchTurnoverLimits, OperationIdSegments).

legacy_rollback_withdrawal_limits(Context, TurnoverLimits, Withdrawal, Route, Iter) ->
LimitChanges = gen_limit_changes(TurnoverLimits, Route, Withdrawal, Iter),
rollback(LimitChanges, get_latest_clock(), Context).

batch_rollback_limits(_Context, [], _OperationIdSegments) ->
ok;
batch_rollback_limits(Context, TurnoverLimits, OperationIdSegments) ->
{LimitRequest, _} = prepare_limit_request(TurnoverLimits, OperationIdSegments),
rollback_batch(LimitRequest, Context).

split_turnover_limits_by_available_limiter_api(TurnoverLimits) ->
lists:partition(fun(#domain_TurnoverLimit{domain_revision = V}) -> V =:= undefined end, TurnoverLimits).

prepare_limit_request(TurnoverLimits, IdSegments) ->
{TurnoverLimitsIdList, LimitChanges} = lists:unzip(
lists:map(
fun(TurnoverLimit = #domain_TurnoverLimit{id = Id, domain_revision = DomainRevision}) ->
{{Id, TurnoverLimit}, #limiter_LimitChange{id = Id, version = DomainRevision}}
end,
TurnoverLimits
)
),
OperationId = make_operation_id(IdSegments),
LimitRequest = #limiter_LimitRequest{operation_id = OperationId, limit_changes = LimitChanges},
TurnoverLimitsMap = maps:from_list(TurnoverLimitsIdList),
{LimitRequest, TurnoverLimitsMap}.

make_operation_id(IdSegments) ->
construct_complex_id([<<"limiter">>, <<"batch-request">>] ++ IdSegments).

-spec hold([limit_change()], clock(), context()) -> ok | no_return().
hold(LimitChanges, Clock, Context) ->
lists:foreach(
Expand Down Expand Up @@ -233,21 +339,43 @@ call_rollback(LimitChange, Clock, Context) ->
{exception, #limiter_PaymentToolNotSupported{}} -> {latest, #limiter_LatestClock{}}
end.

-spec get_batch(request(), context()) -> [limit()] | no_return().
get_batch(Request, Context) ->
{ok, Limits} = call_w_request('GetBatch', Request, Context),
Limits.

-spec hold_batch(request(), context()) -> [limit()] | no_return().
hold_batch(Request, Context) ->
{ok, Limits} = call_w_request('HoldBatch', Request, Context),
Limits.

-spec commit_batch(request(), context()) -> ok | no_return().
commit_batch(Request, Context) ->
{ok, ok} = call_w_request('CommitBatch', Request, Context),
ok.

-spec rollback_batch(request(), context()) -> ok | no_return().
rollback_batch(Request, Context) ->
{ok, ok} = call_w_request('RollbackBatch', Request, Context),
ok.

call(Func, Args) ->
Service = {limproto_limiter_thrift, 'Limiter'},
Request = {Service, Func, Args},
ff_woody_client:call(limiter, Request).

log_limit_changes(TurnoverLimits, Clock, Context) ->
log_limit_changes(TurnoverLimits, Clock, Context, Withdrawal, Route, Iter) ->
LimitValues = collect_limit_values(
Clock, Context, TurnoverLimits, make_operation_segments(Withdrawal, Route, Iter)
),
Attrs = mk_limit_log_attributes(Context),
lists:foreach(
fun(#domain_TurnoverLimit{id = ID, upper_boundary = UpperBoundary, domain_revision = DomainRevision}) ->
#limiter_Limit{amount = LimitAmount} = get(ID, DomainRevision, Clock, Context),
fun(#{id := ID, boundary := UpperBoundary, limit := #limiter_Limit{amount = LimitAmount}}) ->
ok = logger:log(notice, "Limit change commited", [], #{
limit => Attrs#{config_id => ID, boundary => UpperBoundary, amount => LimitAmount}
})
end,
TurnoverLimits
LimitValues
).

mk_limit_log_attributes(#limiter_LimitContext{
Expand Down Expand Up @@ -276,3 +404,20 @@ mk_limit_log_attributes(#limiter_LimitContext{
currency => Currency#domain_CurrencyRef.symbolic_code
}
}.

call_w_request(Function, Request, Context) ->
case call(Function, {Request, Context}) of
{exception, #limiter_LimitNotFound{}} ->
error(not_found);
{exception, #base_InvalidRequest{errors = Errors}} ->
error({invalid_request, Errors});
{exception, Exception} ->
%% NOTE Uniform handling of more specific exceptions:
%% LimitChangeNotFound
%% InvalidOperationCurrency
%% OperationContextNotSupported
%% PaymentToolNotSupported
error(Exception);
{ok, _} = Result ->
Result
end.
8 changes: 4 additions & 4 deletions apps/ff_transfer/src/ff_withdrawal_routing.erl
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ do_rollback_limits(CombinedTerms, _PartyVarset, Route, #{withdrawal := Withdrawa
turnover_limit = TurnoverLimit
} = CombinedTerms,
Limits = ff_limiter:get_turnover_limits(TurnoverLimit),
ff_limiter:rollback_withdrawal_limits(Limits, Route, Withdrawal, Iter).
ff_limiter:rollback_withdrawal_limits(Limits, Withdrawal, Route, Iter).

-spec do_commit_limits(withdrawal_provision_terms(), party_varset(), route(), routing_context()) ->
ok.
Expand All @@ -299,7 +299,7 @@ do_commit_limits(CombinedTerms, _PartyVarset, Route, #{withdrawal := Withdrawal,
turnover_limit = TurnoverLimit
} = CombinedTerms,
Limits = ff_limiter:get_turnover_limits(TurnoverLimit),
ff_limiter:commit_withdrawal_limits(Limits, Route, Withdrawal, Iter).
ff_limiter:commit_withdrawal_limits(Limits, Withdrawal, Route, Iter).

-spec do_validate_limits(withdrawal_provision_terms(), party_varset(), route(), routing_context()) ->
{ok, valid}
Expand Down Expand Up @@ -391,8 +391,8 @@ validate_turnover_limits(undefined, _VS, _Route, _RoutingContext) ->
{ok, valid};
validate_turnover_limits({value, TurnoverLimits}, _VS, Route, #{withdrawal := Withdrawal, iteration := Iter}) ->
try
ok = ff_limiter:hold_withdrawal_limits(TurnoverLimits, Route, Withdrawal, Iter),
case ff_limiter:check_limits(TurnoverLimits, Route, Withdrawal) of
ok = ff_limiter:hold_withdrawal_limits(TurnoverLimits, Withdrawal, Route, Iter),
case ff_limiter:check_limits(TurnoverLimits, Withdrawal, Route, Iter) of
{ok, _} ->
{ok, valid};
{error, Error} ->
Expand Down
20 changes: 18 additions & 2 deletions apps/ff_transfer/test/ff_ct_limiter_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

-export([get/4]).

%% TODO Remove obsolete functions
-export([create_config/2]).
-export([get_config/2]).

Expand All @@ -17,9 +18,24 @@

%%% API

-spec get(limit_id(), limit_version(), limit_context(), client()) -> woody:result() | no_return().
-define(PLACEHOLDER_OPERATION_GET_LIMIT_VALUES, <<"get values">>).

-spec get(limit_id(), limit_version() | undefined, limit_context(), client()) -> woody:result() | no_return().
get(LimitID, undefined, Context, Client) ->
call('GetVersioned', {LimitID, undefined, clock(), Context}, Client);
get(LimitID, Version, Context, Client) ->
call('GetVersioned', {LimitID, Version, clock(), Context}, Client).
LimitRequest = #limiter_LimitRequest{
operation_id = ?PLACEHOLDER_OPERATION_GET_LIMIT_VALUES,
limit_changes = [#limiter_LimitChange{id = LimitID, version = Version}]
},
case call('GetValues', {LimitRequest, Context}, Client) of
{ok, [L]} ->
{ok, L};
{ok, []} ->
{exception, #limiter_LimitNotFound{}};
{exception, _} = Exception ->
Exception
end.

-spec create_config(limit_config_params(), client()) -> woody:result() | no_return().
create_config(LimitCreateParams, Client) ->
Expand Down
Loading
Loading