From 2ad8bd5b045eb835b5fcb8c7450e5896474f676c Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Sun, 24 Nov 2024 22:09:28 +0300 Subject: [PATCH 1/3] TECH-41: Implements usage of new limiter's batch API functions --- apps/ff_transfer/src/ff_limiter.erl | 188 ++++++++++++++++-- .../ff_transfer/src/ff_withdrawal_routing.erl | 8 +- compose.yaml | 32 +++ rebar.lock | 2 +- 4 files changed, 205 insertions(+), 25 deletions(-) diff --git a/apps/ff_transfer/src/ff_limiter.erl b/apps/ff_transfer/src/ff_limiter.erl index b2e0ae67..6fd412b1 100644 --- a/apps/ff_transfer/src/ff_limiter.erl +++ b/apps/ff_transfer/src/ff_limiter.erl @@ -21,9 +21,10 @@ -type limit_amount() :: dmsl_domain_thrift:'Amount'(). -type context() :: limproto_limiter_thrift:'LimitContext'(). -type clock() :: limproto_limiter_thrift:'Clock'(). +-type request() :: limproto_limiter_thrift:'LimitRequest'(). -export([get_turnover_limits/1]). --export([check_limits/3]). +-export([check_limits/4]). -export([marshal_withdrawal/1]). -export([hold_withdrawal_limits/4]). @@ -38,26 +39,71 @@ get_turnover_limits({value, Limits}) -> get_turnover_limits(Ambiguous) -> error({misconfiguration, {'Could not reduce selector to a value', Ambiguous}}). --spec check_limits([turnover_limit()], route(), withdrawal()) -> +-spec check_limits([turnover_limit()], withdrawal(), route(), pos_integer()) -> {ok, [limit()]} | {error, {overflow, [{limit_id(), limit_amount(), turnover_limit_upper_boundary()}]}}. -check_limits(TurnoverLimits, Route, Withdrawal) -> +check_limits(TurnoverLimits, Withdrawal, Route, Iter) -> Context = gen_limit_context(Route, Withdrawal), - case lists:foldl(fun(Limit, Acc) -> check_limits_(Limit, Acc, Context) end, {[], []}, TurnoverLimits) of + LimitValues = collect_limit_values(Context, TurnoverLimits, make_operation_segments(Withdrawal, Route, Iter)), + case lists:foldl(fun(LimitValue, Acc) -> check_limits_(LimitValue, Acc) end, {[], []}, LimitValues) of {Limits, ErrorList} when length(ErrorList) =:= 0 -> {ok, Limits}; {_, ErrorList} -> {error, {overflow, ErrorList}} end. -check_limits_(T, {Limits, Errors}, Context) -> - #domain_TurnoverLimit{id = LimitID, domain_revision = DomainRevision} = T, +make_operation_segments(Withdrawal, _Route = #{terminal_id := TerminalID, provider_id := ProviderID}, Iter) -> + [ + genlib:to_binary(ProviderID), + genlib:to_binary(TerminalID), + ff_withdrawal:id(Withdrawal) + | case Iter of + 1 -> []; + N when N > 1 -> [genlib:to_binary(Iter)] + end + ]. + +collect_limit_values(Context, TurnoverLimits, OperationIdSegments) -> + {LegacyTurnoverLimits, BatchTurnoverLimits} = split_turnover_limits_by_available_limiter_api(TurnoverLimits), + get_legacy_limit_values(Context, LegacyTurnoverLimits) ++ + get_batch_limit_values(Context, BatchTurnoverLimits, OperationIdSegments). + +get_legacy_limit_values(Context, TurnoverLimits) -> Clock = get_latest_clock(), - Limit = get(LimitID, DomainRevision, Clock, Context), - #limiter_Limit{ - amount = LimitAmount - } = Limit, - UpperBoundary = T#domain_TurnoverLimit.upper_boundary, + lists:foldl( + fun(TurnoverLimit, Acc) -> + #domain_TurnoverLimit{id = LimitID, domain_revision = DomainRevision, upper_boundary = UpperBoundary} = + TurnoverLimit, + Limit = get(LimitID, DomainRevision, Clock, Context), + LimitValue = #{ + id => LimitID, + boundary => UpperBoundary, + limit => Limit + }, + [LimitValue | Acc] + end, + [], + TurnoverLimits + ). + +get_batch_limit_values(_Context, [], _OperationIdSegments) -> + []; +get_batch_limit_values(Context, TurnoverLimits, OperationIdSegments) -> + {LimitRequest, TurnoverLimitsMap} = prepare_limit_request(TurnoverLimits, OperationIdSegments), + lists:map( + fun(Limit = #limiter_Limit{id = LimitID}) -> + #domain_TurnoverLimit{upper_boundary = UpperBoundary} = maps:get(LimitID, TurnoverLimitsMap), + #{ + id => LimitID, + boundary => UpperBoundary, + limit => Limit + } + end, + get_values(LimitRequest, Context) + ). + +check_limits_(#{id := LimitID, boundary := UpperBoundary, limit := Limit}, {Limits, Errors}) -> + #limiter_Limit{amount = LimitAmount} = Limit, case LimitAmount =< UpperBoundary of true -> {[Limit | Limits], Errors}; @@ -65,26 +111,83 @@ check_limits_(T, {Limits, Errors}, Context) -> {Limits, [{LimitID, LimitAmount, UpperBoundary} | Errors]} end. --spec hold_withdrawal_limits([turnover_limit()], route(), withdrawal(), pos_integer()) -> ok | no_return(). -hold_withdrawal_limits(TurnoverLimits, Route, Withdrawal, Iter) -> - LimitChanges = gen_limit_changes(TurnoverLimits, Route, Withdrawal, Iter), +-spec hold_withdrawal_limits([turnover_limit()], withdrawal(), route(), pos_integer()) -> ok | no_return(). +hold_withdrawal_limits(TurnoverLimits, Withdrawal, Route, Iter) -> Context = gen_limit_context(Route, Withdrawal), - hold(LimitChanges, get_latest_clock(), Context). + {LegacyTurnoverLimits, BatchTurnoverLimits} = split_turnover_limits_by_available_limiter_api(TurnoverLimits), + ok = legacy_hold_withdrawal_limits(Context, LegacyTurnoverLimits, Withdrawal, Route, Iter), + ok = batch_hold_limits(Context, BatchTurnoverLimits, make_operation_segments(Withdrawal, Route, Iter)). --spec commit_withdrawal_limits([turnover_limit()], route(), withdrawal(), pos_integer()) -> ok. -commit_withdrawal_limits(TurnoverLimits, Route, Withdrawal, Iter) -> +legacy_hold_withdrawal_limits(Context, TurnoverLimits, Withdrawal, Route, Iter) -> LimitChanges = gen_limit_changes(TurnoverLimits, Route, Withdrawal, Iter), + hold(LimitChanges, get_latest_clock(), Context). + +batch_hold_limits(_Context, [], _OperationIdSegments) -> + ok; +batch_hold_limits(Context, TurnoverLimits, OperationIdSegments) -> + {LimitRequest, _} = prepare_limit_request(TurnoverLimits, OperationIdSegments), + hold_batch(LimitRequest, Context). + +-spec commit_withdrawal_limits([turnover_limit()], withdrawal(), route(), pos_integer()) -> ok. +commit_withdrawal_limits(TurnoverLimits, Withdrawal, Route, Iter) -> Context = gen_limit_context(Route, Withdrawal), + {LegacyTurnoverLimits, BatchTurnoverLimits} = split_turnover_limits_by_available_limiter_api(TurnoverLimits), Clock = get_latest_clock(), - ok = commit(LimitChanges, Clock, Context), + ok = legacy_commit_withdrawal_limits(Context, LegacyTurnoverLimits, Withdrawal, Route, Iter), + OperationIdSegments = make_operation_segments(Withdrawal, Route, Iter), + ok = batch_commit_limits(Context, BatchTurnoverLimits, OperationIdSegments), ok = log_limit_changes(TurnoverLimits, Clock, Context). --spec rollback_withdrawal_limits([turnover_limit()], route(), withdrawal(), pos_integer()) -> ok. -rollback_withdrawal_limits(TurnoverLimits, Route, Withdrawal, Iter) -> +legacy_commit_withdrawal_limits(Context, TurnoverLimits, Withdrawal, Route, Iter) -> LimitChanges = gen_limit_changes(TurnoverLimits, Route, Withdrawal, Iter), + Clock = get_latest_clock(), + ok = commit(LimitChanges, Clock, Context), + ok = log_limit_changes(TurnoverLimits, Clock, Context). + +batch_commit_limits(_Context, [], _OperationIdSegments) -> + ok; +batch_commit_limits(Context, TurnoverLimits, OperationIdSegments) -> + {LimitRequest, _} = prepare_limit_request(TurnoverLimits, OperationIdSegments), + commit_batch(LimitRequest, Context). + +-spec rollback_withdrawal_limits([turnover_limit()], withdrawal(), route(), pos_integer()) -> ok. +rollback_withdrawal_limits(TurnoverLimits, Withdrawal, Route, Iter) -> Context = gen_limit_context(Route, Withdrawal), + {LegacyTurnoverLimits, BatchTurnoverLimits} = split_turnover_limits_by_available_limiter_api(TurnoverLimits), + ok = legacy_rollback_withdrawal_limits(Context, LegacyTurnoverLimits, Withdrawal, Route, Iter), + OperationIdSegments = make_operation_segments(Withdrawal, Route, Iter), + ok = batch_rollback_limits(Context, BatchTurnoverLimits, OperationIdSegments). + +legacy_rollback_withdrawal_limits(Context, TurnoverLimits, Withdrawal, Route, Iter) -> + LimitChanges = gen_limit_changes(TurnoverLimits, Route, Withdrawal, Iter), rollback(LimitChanges, get_latest_clock(), Context). +batch_rollback_limits(_Context, [], _OperationIdSegments) -> + ok; +batch_rollback_limits(Context, TurnoverLimits, OperationIdSegments) -> + {LimitRequest, _} = prepare_limit_request(TurnoverLimits, OperationIdSegments), + rollback_batch(LimitRequest, Context). + +split_turnover_limits_by_available_limiter_api(TurnoverLimits) -> + lists:partition(fun(#domain_TurnoverLimit{domain_revision = V}) -> V =:= undefined end, TurnoverLimits). + +prepare_limit_request(TurnoverLimits, IdSegments) -> + {TurnoverLimitsIdList, LimitChanges} = lists:unzip( + lists:map( + fun(TurnoverLimit = #domain_TurnoverLimit{id = Id, domain_revision = DomainRevision}) -> + {{Id, TurnoverLimit}, #limiter_LimitChange{id = Id, version = DomainRevision}} + end, + TurnoverLimits + ) + ), + OperationId = make_operation_id(IdSegments), + LimitRequest = #limiter_LimitRequest{operation_id = OperationId, limit_changes = LimitChanges}, + TurnoverLimitsMap = maps:from_list(TurnoverLimitsIdList), + {LimitRequest, TurnoverLimitsMap}. + +make_operation_id(IdSegments) -> + construct_complex_id([<<"limiter">>, <<"batch-request">>] ++ IdSegments). + -spec hold([limit_change()], clock(), context()) -> ok | no_return(). hold(LimitChanges, Clock, Context) -> lists:foreach( @@ -233,6 +336,31 @@ call_rollback(LimitChange, Clock, Context) -> {exception, #limiter_PaymentToolNotSupported{}} -> {latest, #limiter_LatestClock{}} end. +-spec get_values(request(), context()) -> [limit()] | no_return(). +get_values(Request, Context) -> + {ok, Limits} = call_w_request('GetValues', Request, Context), + Limits. + +%% -spec get_batch(request(), context()) -> [limit()] | no_return(). +%% get_batch(Request, Context) -> +%% {ok, Limits} = call_w_request('GetBatch', Request, Context), +%% Limits. + +-spec hold_batch(request(), context()) -> [limit()] | no_return(). +hold_batch(Request, Context) -> + {ok, Limits} = call_w_request('HoldBatch', Request, Context), + Limits. + +-spec commit_batch(request(), context()) -> ok | no_return(). +commit_batch(Request, Context) -> + {ok, ok} = call_w_request('CommitBatch', Request, Context), + ok. + +-spec rollback_batch(request(), context()) -> ok | no_return(). +rollback_batch(Request, Context) -> + {ok, ok} = call_w_request('RollbackBatch', Request, Context), + ok. + call(Func, Args) -> Service = {limproto_limiter_thrift, 'Limiter'}, Request = {Service, Func, Args}, @@ -276,3 +404,23 @@ mk_limit_log_attributes(#limiter_LimitContext{ currency => Currency#domain_CurrencyRef.symbolic_code } }. + +call_w_request(Function, LimitRequest, Context) -> + Service = {limproto_limiter_thrift, 'Limiter'}, + Args = {LimitRequest, Context}, + Request = {Service, Function, Args}, + case ff_woody_client:call(limiter, Request) of + {exception, #limiter_LimitNotFound{}} -> + error(not_found); + {exception, #base_InvalidRequest{errors = Errors}} -> + error({invalid_request, Errors}); + {exception, Exception} -> + %% NOTE Uniform handling of more specific exceptions: + %% LimitChangeNotFound + %% InvalidOperationCurrency + %% OperationContextNotSupported + %% PaymentToolNotSupported + error(Exception); + {ok, _} = Result -> + Result + end. diff --git a/apps/ff_transfer/src/ff_withdrawal_routing.erl b/apps/ff_transfer/src/ff_withdrawal_routing.erl index 1a43dc6f..a8f48f33 100644 --- a/apps/ff_transfer/src/ff_withdrawal_routing.erl +++ b/apps/ff_transfer/src/ff_withdrawal_routing.erl @@ -290,7 +290,7 @@ do_rollback_limits(CombinedTerms, _PartyVarset, Route, #{withdrawal := Withdrawa turnover_limit = TurnoverLimit } = CombinedTerms, Limits = ff_limiter:get_turnover_limits(TurnoverLimit), - ff_limiter:rollback_withdrawal_limits(Limits, Route, Withdrawal, Iter). + ff_limiter:rollback_withdrawal_limits(Limits, Withdrawal, Route, Iter). -spec do_commit_limits(withdrawal_provision_terms(), party_varset(), route(), routing_context()) -> ok. @@ -299,7 +299,7 @@ do_commit_limits(CombinedTerms, _PartyVarset, Route, #{withdrawal := Withdrawal, turnover_limit = TurnoverLimit } = CombinedTerms, Limits = ff_limiter:get_turnover_limits(TurnoverLimit), - ff_limiter:commit_withdrawal_limits(Limits, Route, Withdrawal, Iter). + ff_limiter:commit_withdrawal_limits(Limits, Withdrawal, Route, Iter). -spec do_validate_limits(withdrawal_provision_terms(), party_varset(), route(), routing_context()) -> {ok, valid} @@ -391,8 +391,8 @@ validate_turnover_limits(undefined, _VS, _Route, _RoutingContext) -> {ok, valid}; validate_turnover_limits({value, TurnoverLimits}, _VS, Route, #{withdrawal := Withdrawal, iteration := Iter}) -> try - ok = ff_limiter:hold_withdrawal_limits(TurnoverLimits, Route, Withdrawal, Iter), - case ff_limiter:check_limits(TurnoverLimits, Route, Withdrawal) of + ok = ff_limiter:hold_withdrawal_limits(TurnoverLimits, Withdrawal, Route, Iter), + case ff_limiter:check_limits(TurnoverLimits, Withdrawal, Route, Iter) of {ok, _} -> {ok, valid}; {error, Error} -> diff --git a/compose.yaml b/compose.yaml index c58d90fb..951d661e 100644 --- a/compose.yaml +++ b/compose.yaml @@ -59,6 +59,8 @@ services: condition: service_healthy shumway: condition: service_started + liminator: + condition: service_healthy healthcheck: test: "/opt/limiter/bin/limiter ping" interval: 5s @@ -85,6 +87,36 @@ services: healthcheck: disable: true + liminator: + image: ghcr.io/valitydev/liminator:sha-fc6546f + restart: unless-stopped + entrypoint: + - java + - -Xmx512m + - -jar + - /opt/liminator/liminator.jar + - --spring.datasource.url=jdbc:postgresql://liminator-db:5432/liminator + - --spring.datasource.username=vality + - --spring.datasource.password=postgres + - --spring.flyway.url=jdbc:postgresql://liminator-db:5432/liminator + - --spring.flyway.username=vality + - --spring.flyway.password=postgres + - --service.skipExistedHoldOps=false + depends_on: + - liminator-db + healthcheck: + test: "curl http://localhost:8022/actuator/health" + interval: 5s + timeout: 1s + retries: 20 + + liminator-db: + image: docker.io/library/postgres:13.10 + environment: + - POSTGRES_DB=liminator + - POSTGRES_USER=vality + - POSTGRES_PASSWORD=postgres + party-management: image: ghcr.io/valitydev/party-management:sha-9af7d71 command: /opt/party-management/bin/party-management foreground diff --git a/rebar.lock b/rebar.lock index e6453975..d15999a5 100644 --- a/rebar.lock +++ b/rebar.lock @@ -55,7 +55,7 @@ {<<"jsx">>,{pkg,<<"jsx">>,<<"3.1.0">>},1}, {<<"limiter_proto">>, {git,"https://github.com/valitydev/limiter-proto.git", - {ref,"e045813d32e67432e5592d582e59e45df05da647"}}, + {ref,"970f197ce6c527fee5c45237ad2ce4b8820184a1"}}, 0}, {<<"machinery">>, {git,"https://github.com/valitydev/machinery-erlang.git", From 0e2e312a3f68cf2fd019722f87fd974f8cace6a5 Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Sun, 24 Nov 2024 22:11:04 +0300 Subject: [PATCH 2/3] Simplify 'call_w_request' --- apps/ff_transfer/src/ff_limiter.erl | 7 ++----- 1 file changed, 2 insertions(+), 5 deletions(-) diff --git a/apps/ff_transfer/src/ff_limiter.erl b/apps/ff_transfer/src/ff_limiter.erl index 6fd412b1..aed89356 100644 --- a/apps/ff_transfer/src/ff_limiter.erl +++ b/apps/ff_transfer/src/ff_limiter.erl @@ -405,11 +405,8 @@ mk_limit_log_attributes(#limiter_LimitContext{ } }. -call_w_request(Function, LimitRequest, Context) -> - Service = {limproto_limiter_thrift, 'Limiter'}, - Args = {LimitRequest, Context}, - Request = {Service, Function, Args}, - case ff_woody_client:call(limiter, Request) of +call_w_request(Function, Request, Context) -> + case call(Function, {Request, Context}) of {exception, #limiter_LimitNotFound{}} -> error(not_found); {exception, #base_InvalidRequest{errors = Errors}} -> From 3dfcfbd887f9ae935384d8f2120e2c537847f4e1 Mon Sep 17 00:00:00 2001 From: Aleksey Kashapov Date: Mon, 25 Nov 2024 20:46:14 +0300 Subject: [PATCH 3/3] Updates limits testcases --- apps/ff_cth/src/ct_payment_system.erl | 16 ++-- apps/ff_transfer/src/ff_limiter.erl | 46 ++++++------ .../ff_transfer/test/ff_ct_limiter_client.erl | 20 ++++- apps/ff_transfer/test/ff_limiter_helper.erl | 50 ++++++++----- .../test/ff_withdrawal_limits_SUITE.erl | 74 ++++++++++++------- compose.yaml | 2 +- 6 files changed, 130 insertions(+), 78 deletions(-) diff --git a/apps/ff_cth/src/ct_payment_system.erl b/apps/ff_cth/src/ct_payment_system.erl index b461ff19..526091a7 100644 --- a/apps/ff_cth/src/ct_payment_system.erl +++ b/apps/ff_cth/src/ct_payment_system.erl @@ -17,7 +17,7 @@ provider_identity_id => id(), dummy_provider_identity_id => id(), optional_apps => list(), - setup_dominant => fun((config()) -> ok) + setup_dominant => fun((config()) -> config()) }. -opaque system() :: #{ @@ -62,10 +62,10 @@ do_setup(Options0, C0) -> {ok, Processing0} = start_processing_apps(Options), C1 = ct_helper:makeup_cfg([ct_helper:woody_ctx()], [{services, services(Options)} | C0]), ok = ct_helper:set_context(C1), - ok = setup_dominant(C1, Options), + C2 = setup_dominant(C1, Options), ok = configure_processing_apps(Options), ok = ct_helper:unset_context(), - [{payment_system, Processing0} | C1]. + [{payment_system, Processing0} | C2]. start_processing_apps(Options) -> {StartedApps, _StartupCtx} = ct_helper:start_apps([ @@ -139,18 +139,18 @@ start_optional_apps(#{optional_apps := Apps}) -> start_optional_apps(_) -> []. -setup_dominant(Config, Options) -> - ok = setup_dominant_internal(Config, Options), +setup_dominant(Config0, Options) -> + Config1 = setup_dominant_internal(Config0, Options), DomainConfig = domain_config(Options), _ = ct_domain_config:upsert(DomainConfig), DomainConfigUpdate = domain_config_add_version(Options), _ = ct_domain_config:upsert(DomainConfigUpdate), - ok. + Config1. setup_dominant_internal(Config, #{setup_dominant := Func}) when is_function(Func, 1) -> Func(Config); -setup_dominant_internal(_Config, _Options) -> - ok. +setup_dominant_internal(Config, _Options) -> + Config. configure_processing_apps(Options) -> ok = set_app_env( diff --git a/apps/ff_transfer/src/ff_limiter.erl b/apps/ff_transfer/src/ff_limiter.erl index aed89356..1704405b 100644 --- a/apps/ff_transfer/src/ff_limiter.erl +++ b/apps/ff_transfer/src/ff_limiter.erl @@ -43,8 +43,11 @@ get_turnover_limits(Ambiguous) -> {ok, [limit()]} | {error, {overflow, [{limit_id(), limit_amount(), turnover_limit_upper_boundary()}]}}. check_limits(TurnoverLimits, Withdrawal, Route, Iter) -> + Clock = get_latest_clock(), Context = gen_limit_context(Route, Withdrawal), - LimitValues = collect_limit_values(Context, TurnoverLimits, make_operation_segments(Withdrawal, Route, Iter)), + LimitValues = collect_limit_values( + Clock, Context, TurnoverLimits, make_operation_segments(Withdrawal, Route, Iter) + ), case lists:foldl(fun(LimitValue, Acc) -> check_limits_(LimitValue, Acc) end, {[], []}, LimitValues) of {Limits, ErrorList} when length(ErrorList) =:= 0 -> {ok, Limits}; @@ -63,13 +66,12 @@ make_operation_segments(Withdrawal, _Route = #{terminal_id := TerminalID, provid end ]. -collect_limit_values(Context, TurnoverLimits, OperationIdSegments) -> +collect_limit_values(Clock, Context, TurnoverLimits, OperationIdSegments) -> {LegacyTurnoverLimits, BatchTurnoverLimits} = split_turnover_limits_by_available_limiter_api(TurnoverLimits), - get_legacy_limit_values(Context, LegacyTurnoverLimits) ++ + get_legacy_limit_values(Clock, Context, LegacyTurnoverLimits) ++ get_batch_limit_values(Context, BatchTurnoverLimits, OperationIdSegments). -get_legacy_limit_values(Context, TurnoverLimits) -> - Clock = get_latest_clock(), +get_legacy_limit_values(Clock, Context, TurnoverLimits) -> lists:foldl( fun(TurnoverLimit, Acc) -> #domain_TurnoverLimit{id = LimitID, domain_revision = DomainRevision, upper_boundary = UpperBoundary} = @@ -99,7 +101,7 @@ get_batch_limit_values(Context, TurnoverLimits, OperationIdSegments) -> limit => Limit } end, - get_values(LimitRequest, Context) + get_batch(LimitRequest, Context) ). check_limits_(#{id := LimitID, boundary := UpperBoundary, limit := Limit}, {Limits, Errors}) -> @@ -126,7 +128,8 @@ batch_hold_limits(_Context, [], _OperationIdSegments) -> ok; batch_hold_limits(Context, TurnoverLimits, OperationIdSegments) -> {LimitRequest, _} = prepare_limit_request(TurnoverLimits, OperationIdSegments), - hold_batch(LimitRequest, Context). + _ = hold_batch(LimitRequest, Context), + ok. -spec commit_withdrawal_limits([turnover_limit()], withdrawal(), route(), pos_integer()) -> ok. commit_withdrawal_limits(TurnoverLimits, Withdrawal, Route, Iter) -> @@ -136,19 +139,19 @@ commit_withdrawal_limits(TurnoverLimits, Withdrawal, Route, Iter) -> ok = legacy_commit_withdrawal_limits(Context, LegacyTurnoverLimits, Withdrawal, Route, Iter), OperationIdSegments = make_operation_segments(Withdrawal, Route, Iter), ok = batch_commit_limits(Context, BatchTurnoverLimits, OperationIdSegments), - ok = log_limit_changes(TurnoverLimits, Clock, Context). + ok = log_limit_changes(TurnoverLimits, Clock, Context, Withdrawal, Route, Iter). legacy_commit_withdrawal_limits(Context, TurnoverLimits, Withdrawal, Route, Iter) -> LimitChanges = gen_limit_changes(TurnoverLimits, Route, Withdrawal, Iter), Clock = get_latest_clock(), - ok = commit(LimitChanges, Clock, Context), - ok = log_limit_changes(TurnoverLimits, Clock, Context). + ok = commit(LimitChanges, Clock, Context). batch_commit_limits(_Context, [], _OperationIdSegments) -> ok; batch_commit_limits(Context, TurnoverLimits, OperationIdSegments) -> {LimitRequest, _} = prepare_limit_request(TurnoverLimits, OperationIdSegments), - commit_batch(LimitRequest, Context). + _ = commit_batch(LimitRequest, Context), + ok. -spec rollback_withdrawal_limits([turnover_limit()], withdrawal(), route(), pos_integer()) -> ok. rollback_withdrawal_limits(TurnoverLimits, Withdrawal, Route, Iter) -> @@ -336,16 +339,11 @@ call_rollback(LimitChange, Clock, Context) -> {exception, #limiter_PaymentToolNotSupported{}} -> {latest, #limiter_LatestClock{}} end. --spec get_values(request(), context()) -> [limit()] | no_return(). -get_values(Request, Context) -> - {ok, Limits} = call_w_request('GetValues', Request, Context), +-spec get_batch(request(), context()) -> [limit()] | no_return(). +get_batch(Request, Context) -> + {ok, Limits} = call_w_request('GetBatch', Request, Context), Limits. -%% -spec get_batch(request(), context()) -> [limit()] | no_return(). -%% get_batch(Request, Context) -> -%% {ok, Limits} = call_w_request('GetBatch', Request, Context), -%% Limits. - -spec hold_batch(request(), context()) -> [limit()] | no_return(). hold_batch(Request, Context) -> {ok, Limits} = call_w_request('HoldBatch', Request, Context), @@ -366,16 +364,18 @@ call(Func, Args) -> Request = {Service, Func, Args}, ff_woody_client:call(limiter, Request). -log_limit_changes(TurnoverLimits, Clock, Context) -> +log_limit_changes(TurnoverLimits, Clock, Context, Withdrawal, Route, Iter) -> + LimitValues = collect_limit_values( + Clock, Context, TurnoverLimits, make_operation_segments(Withdrawal, Route, Iter) + ), Attrs = mk_limit_log_attributes(Context), lists:foreach( - fun(#domain_TurnoverLimit{id = ID, upper_boundary = UpperBoundary, domain_revision = DomainRevision}) -> - #limiter_Limit{amount = LimitAmount} = get(ID, DomainRevision, Clock, Context), + fun(#{id := ID, boundary := UpperBoundary, limit := #limiter_Limit{amount = LimitAmount}}) -> ok = logger:log(notice, "Limit change commited", [], #{ limit => Attrs#{config_id => ID, boundary => UpperBoundary, amount => LimitAmount} }) end, - TurnoverLimits + LimitValues ). mk_limit_log_attributes(#limiter_LimitContext{ diff --git a/apps/ff_transfer/test/ff_ct_limiter_client.erl b/apps/ff_transfer/test/ff_ct_limiter_client.erl index 4d9f0f2a..127530d2 100644 --- a/apps/ff_transfer/test/ff_ct_limiter_client.erl +++ b/apps/ff_transfer/test/ff_ct_limiter_client.erl @@ -4,6 +4,7 @@ -export([get/4]). +%% TODO Remove obsolete functions -export([create_config/2]). -export([get_config/2]). @@ -17,9 +18,24 @@ %%% API --spec get(limit_id(), limit_version(), limit_context(), client()) -> woody:result() | no_return(). +-define(PLACEHOLDER_OPERATION_GET_LIMIT_VALUES, <<"get values">>). + +-spec get(limit_id(), limit_version() | undefined, limit_context(), client()) -> woody:result() | no_return(). +get(LimitID, undefined, Context, Client) -> + call('GetVersioned', {LimitID, undefined, clock(), Context}, Client); get(LimitID, Version, Context, Client) -> - call('GetVersioned', {LimitID, Version, clock(), Context}, Client). + LimitRequest = #limiter_LimitRequest{ + operation_id = ?PLACEHOLDER_OPERATION_GET_LIMIT_VALUES, + limit_changes = [#limiter_LimitChange{id = LimitID, version = Version}] + }, + case call('GetValues', {LimitRequest, Context}, Client) of + {ok, [L]} -> + {ok, L}; + {ok, []} -> + {exception, #limiter_LimitNotFound{}}; + {exception, _} = Exception -> + Exception + end. -spec create_config(limit_config_params(), client()) -> woody:result() | no_return(). create_config(LimitCreateParams, Client) -> diff --git a/apps/ff_transfer/test/ff_limiter_helper.erl b/apps/ff_transfer/test/ff_limiter_helper.erl index 215d1d5c..9b2a5463 100644 --- a/apps/ff_transfer/test/ff_limiter_helper.erl +++ b/apps/ff_transfer/test/ff_limiter_helper.erl @@ -8,31 +8,35 @@ -include_lib("ff_cth/include/ct_domain.hrl"). -export([init_per_suite/1]). --export([get_limit_amount/3]). --export([get_limit/3]). +-export([get_limit_amount/4]). +-export([get_limit/4]). -type withdrawal() :: ff_withdrawal:withdrawal_state() | dmsl_wthd_domain_thrift:'Withdrawal'(). -type limit() :: limproto_limiter_thrift:'Limit'(). -type config() :: ct_suite:ct_config(). -type id() :: binary(). +-define(PLACEHOLDER_UNINITIALIZED_LIMIT_ID, <<"uninitialized limit">>). + -spec init_per_suite(config()) -> _. -init_per_suite(_Config) -> - _ = dmt_client:upsert({limit_config, limiter_mk_config_object_num(?LIMIT_TURNOVER_NUM_PAYTOOL_ID1)}), - _ = dmt_client:upsert({limit_config, limiter_mk_config_object_num(?LIMIT_TURNOVER_NUM_PAYTOOL_ID2)}), - _ = dmt_client:upsert({limit_config, limiter_mk_config_object_amount(?LIMIT_TURNOVER_AMOUNT_PAYTOOL_ID1)}), - _ = dmt_client:upsert({limit_config, limiter_mk_config_object_amount(?LIMIT_TURNOVER_AMOUNT_PAYTOOL_ID2)}), - _ = dmt_client:upsert({limit_config, limiter_mk_config_object_amount(?LIMIT_TURNOVER_AMOUNT_PAYTOOL_ID3)}), - _ = dmt_client:upsert({limit_config, limiter_mk_config_object_amount(?LIMIT_TURNOVER_AMOUNT_PAYTOOL_ID4)}), - ok. +init_per_suite(Config) -> + LimitsRevision = dmt_client:upsert([ + {limit_config, limiter_mk_config_object_num(?LIMIT_TURNOVER_NUM_PAYTOOL_ID1)}, + {limit_config, limiter_mk_config_object_num(?LIMIT_TURNOVER_NUM_PAYTOOL_ID2)}, + {limit_config, limiter_mk_config_object_amount(?LIMIT_TURNOVER_AMOUNT_PAYTOOL_ID1)}, + {limit_config, limiter_mk_config_object_amount(?LIMIT_TURNOVER_AMOUNT_PAYTOOL_ID2)}, + {limit_config, limiter_mk_config_object_amount(?LIMIT_TURNOVER_AMOUNT_PAYTOOL_ID3)}, + {limit_config, limiter_mk_config_object_amount(?LIMIT_TURNOVER_AMOUNT_PAYTOOL_ID4)} + ]), + [{'$limits_domain_revision', LimitsRevision} | Config]. --spec get_limit_amount(id(), withdrawal(), config()) -> integer(). -get_limit_amount(LimitID, Withdrawal, Config) -> - #limiter_Limit{amount = Amount} = get_limit(LimitID, Withdrawal, Config), +-spec get_limit_amount(id(), dmt_client:vsn(), withdrawal(), config()) -> integer(). +get_limit_amount(LimitID, Version, Withdrawal, Config) -> + #limiter_Limit{amount = Amount} = get_limit(LimitID, Version, Withdrawal, Config), Amount. --spec get_limit(id(), withdrawal(), config()) -> limit(). -get_limit(LimitId, Withdrawal, Config) -> +-spec get_limit(id(), dmt_client:vsn(), withdrawal(), config()) -> limit(). +get_limit(LimitId, Version, Withdrawal, Config) -> MarshaledWithdrawal = maybe_marshal_withdrawal(Withdrawal), Context = #limiter_LimitContext{ withdrawal_processing = #context_withdrawal_Context{ @@ -40,10 +44,18 @@ get_limit(LimitId, Withdrawal, Config) -> withdrawal = #context_withdrawal_Withdrawal{withdrawal = MarshaledWithdrawal} } }, - #domain_conf_VersionedObject{version = Version} = - dmt_client:checkout_versioned_object({'limit_config', #domain_LimitConfigRef{id = LimitId}}), - {ok, Limit} = ff_ct_limiter_client:get(LimitId, Version, Context, ct_helper:get_woody_ctx(Config)), - Limit. + maybe_uninitialized_limit(ff_ct_limiter_client:get(LimitId, Version, Context, ct_helper:get_woody_ctx(Config))). + +-spec maybe_uninitialized_limit({ok, _} | {exception, _}) -> _Limit. +maybe_uninitialized_limit({ok, Limit}) -> + Limit; +maybe_uninitialized_limit({exception, _}) -> + #limiter_Limit{ + id = ?PLACEHOLDER_UNINITIALIZED_LIMIT_ID, + amount = 0, + creation_time = undefined, + description = undefined + }. maybe_marshal_withdrawal(Withdrawal = #wthd_domain_Withdrawal{}) -> Withdrawal; diff --git a/apps/ff_transfer/test/ff_withdrawal_limits_SUITE.erl b/apps/ff_transfer/test/ff_withdrawal_limits_SUITE.erl index 901f4aa3..725398e4 100644 --- a/apps/ff_transfer/test/ff_withdrawal_limits_SUITE.erl +++ b/apps/ff_transfer/test/ff_withdrawal_limits_SUITE.erl @@ -179,7 +179,10 @@ limit_success(C) -> ?assertEqual(succeeded, await_final_withdrawal_status(WithdrawalID)), Withdrawal = get_withdrawal(WithdrawalID), ?assertEqual( - PreviousAmount + 1, ff_limiter_helper:get_limit_amount(?LIMIT_TURNOVER_NUM_PAYTOOL_ID1, Withdrawal, C) + PreviousAmount + 1, + ff_limiter_helper:get_limit_amount( + ?LIMIT_TURNOVER_NUM_PAYTOOL_ID1, ct_helper:cfg('$limits_domain_revision', C), Withdrawal, C + ) ). -spec limit_overflow(config()) -> test_return(). @@ -204,18 +207,23 @@ limit_overflow(C) -> %% we get final withdrawal status before we rollback limits so wait for it some amount of time ok = timer:sleep(500), Withdrawal = get_withdrawal(WithdrawalID), - ?assertEqual(PreviousAmount, ff_limiter_helper:get_limit_amount(?LIMIT_TURNOVER_NUM_PAYTOOL_ID2, Withdrawal, C)). + ?assertEqual( + PreviousAmount, + ff_limiter_helper:get_limit_amount( + ?LIMIT_TURNOVER_NUM_PAYTOOL_ID2, ct_helper:cfg('$limits_domain_revision', C), Withdrawal, C + ) + ). -spec limit_hold_currency_error(config()) -> test_return(). limit_hold_currency_error(C) -> - mock_limiter_trm_hold(?trm(1800), fun(_LimitChange, _Clock, _Context) -> + mock_limiter_trm_hold_batch(?trm(1800), fun(_LimitRequest, _Context) -> {exception, #limiter_InvalidOperationCurrency{currency = <<"RUB">>, expected_currency = <<"KEK">>}} end), limit_hold_error(C). -spec limit_hold_operation_error(config()) -> test_return(). limit_hold_operation_error(C) -> - mock_limiter_trm_hold(?trm(1800), fun(_LimitChange, _Clock, _Context) -> + mock_limiter_trm_hold_batch(?trm(1800), fun(_LimitRequest, _Context) -> {exception, #limiter_OperationContextNotSupported{ context_type = {withdrawal_processing, #limiter_config_LimitContextTypeWithdrawalProcessing{}} }} @@ -224,14 +232,14 @@ limit_hold_operation_error(C) -> -spec limit_hold_paytool_error(config()) -> test_return(). limit_hold_paytool_error(C) -> - mock_limiter_trm_hold(?trm(1800), fun(_LimitChange, _Clock, _Context) -> + mock_limiter_trm_hold_batch(?trm(1800), fun(_LimitRequest, _Context) -> {exception, #limiter_PaymentToolNotSupported{payment_tool = <<"unsupported paytool">>}} end), limit_hold_error(C). -spec limit_hold_error_two_routes_failure(config()) -> test_return(). limit_hold_error_two_routes_failure(C) -> - mock_limiter_trm_call(?trm(2000), fun(_LimitChange, _Clock, _Context) -> + mock_limiter_trm_call(?trm(2000), fun(_LimitRequest, _Context) -> {exception, #limiter_PaymentToolNotSupported{payment_tool = <<"unsupported paytool">>}} end), %% See `?ruleset(?PAYINST1_ROUTING_POLICIES + 18)` with two candidates in `ct_payment_system:domain_config/1`. @@ -255,27 +263,25 @@ limit_hold_error_two_routes_failure(C) -> -define(LIMITER_REQUEST(Func, TerminalRef), { {limproto_limiter_thrift, 'Limiter'}, Func, - {_LimitChange, _Clock, #limiter_LimitContext{ + {_LimitRequest, #limiter_LimitContext{ withdrawal_processing = #context_withdrawal_Context{ withdrawal = #context_withdrawal_Withdrawal{route = #base_Route{terminal = TerminalRef}} } }} }). -mock_limiter_trm_hold(ExpectTerminalRef, ReturnFunc) -> - ok = meck:expect(ff_woody_client, call, fun - (limiter, {_, _, Args} = ?LIMITER_REQUEST('Hold', TerminalRef)) when TerminalRef =:= ExpectTerminalRef -> - apply(ReturnFunc, tuple_to_list(Args)); - (Service, Request) -> - meck:passthrough([Service, Request]) - end). + +-define(MOCKED_LIMITER_FUNC(CallFunc, ExpectTerminalRef, ReturnFunc), fun + (limiter, {_, _, Args} = ?LIMITER_REQUEST(CallFunc, TerminalRef)) when TerminalRef =:= ExpectTerminalRef -> + apply(ReturnFunc, tuple_to_list(Args)); + (Service, Request) -> + meck:passthrough([Service, Request]) +end). + +mock_limiter_trm_hold_batch(ExpectTerminalRef, ReturnFunc) -> + ok = meck:expect(ff_woody_client, call, ?MOCKED_LIMITER_FUNC('HoldBatch', ExpectTerminalRef, ReturnFunc)). mock_limiter_trm_call(ExpectTerminalRef, ReturnFunc) -> - ok = meck:expect(ff_woody_client, call, fun - (limiter, {_, _, Args} = ?LIMITER_REQUEST(_Func, TerminalRef)) when TerminalRef =:= ExpectTerminalRef -> - apply(ReturnFunc, tuple_to_list(Args)); - (Service, Request) -> - meck:passthrough([Service, Request]) - end). + ok = meck:expect(ff_woody_client, call, ?MOCKED_LIMITER_FUNC(_, ExpectTerminalRef, ReturnFunc)). limit_hold_error(C) -> Cash = {800800, <<"RUB">>}, @@ -315,7 +321,10 @@ choose_provider_without_limit_overflow(C) -> ?assertEqual(succeeded, await_final_withdrawal_status(WithdrawalID)), Withdrawal = get_withdrawal(WithdrawalID), ?assertEqual( - PreviousAmount + 1, ff_limiter_helper:get_limit_amount(?LIMIT_TURNOVER_NUM_PAYTOOL_ID2, Withdrawal, C) + PreviousAmount + 1, + ff_limiter_helper:get_limit_amount( + ?LIMIT_TURNOVER_NUM_PAYTOOL_ID2, ct_helper:cfg('$limits_domain_revision', C), Withdrawal, C + ) ). -spec provider_limits_exhaust_orderly(config()) -> test_return(). @@ -343,7 +352,12 @@ provider_limits_exhaust_orderly(C) -> ok = ff_withdrawal_machine:create(WithdrawalParams1, ff_entity_context:new()), ?assertEqual(succeeded, await_final_withdrawal_status(WithdrawalID1)), Withdrawal1 = get_withdrawal(WithdrawalID1), - ?assertEqual(902000, ff_limiter_helper:get_limit_amount(?LIMIT_TURNOVER_AMOUNT_PAYTOOL_ID1, Withdrawal1, C)), + ?assertEqual( + 902000, + ff_limiter_helper:get_limit_amount( + ?LIMIT_TURNOVER_AMOUNT_PAYTOOL_ID1, ct_helper:cfg('$limits_domain_revision', C), Withdrawal1, C + ) + ), %% Second withdrawal goes to limit 2 as limit 1 doesn't have enough and spents all its amount WithdrawalID2 = generate_id(), @@ -358,7 +372,12 @@ provider_limits_exhaust_orderly(C) -> ok = ff_withdrawal_machine:create(WithdrawalParams2, ff_entity_context:new()), ?assertEqual(succeeded, await_final_withdrawal_status(WithdrawalID2)), Withdrawal2 = get_withdrawal(WithdrawalID2), - ?assertEqual(903000, ff_limiter_helper:get_limit_amount(?LIMIT_TURNOVER_AMOUNT_PAYTOOL_ID2, Withdrawal2, C)), + ?assertEqual( + 903000, + ff_limiter_helper:get_limit_amount( + ?LIMIT_TURNOVER_AMOUNT_PAYTOOL_ID2, ct_helper:cfg('$limits_domain_revision', C), Withdrawal2, C + ) + ), %% Third withdrawal goes to limit 1 and spents all its amount WithdrawalID3 = generate_id(), @@ -373,7 +392,12 @@ provider_limits_exhaust_orderly(C) -> ok = ff_withdrawal_machine:create(WithdrawalParams3, ff_entity_context:new()), ?assertEqual(succeeded, await_final_withdrawal_status(WithdrawalID3)), Withdrawal3 = get_withdrawal(WithdrawalID3), - ?assertEqual(1804000, ff_limiter_helper:get_limit_amount(?LIMIT_TURNOVER_AMOUNT_PAYTOOL_ID1, Withdrawal3, C)), + ?assertEqual( + 1804000, + ff_limiter_helper:get_limit_amount( + ?LIMIT_TURNOVER_AMOUNT_PAYTOOL_ID1, ct_helper:cfg('$limits_domain_revision', C), Withdrawal3, C + ) + ), %% Last withdrawal can't find route cause all limits are drained WithdrawalID = generate_id(), @@ -498,7 +522,7 @@ get_limit_withdrawal(Cash, WalletID, DestinationID) -> get_limit_amount(Cash, WalletID, DestinationID, LimitID, C) -> Withdrawal = get_limit_withdrawal(Cash, WalletID, DestinationID), - ff_limiter_helper:get_limit_amount(LimitID, Withdrawal, C). + ff_limiter_helper:get_limit_amount(LimitID, ct_helper:cfg('$limits_domain_revision', C), Withdrawal, C). get_destination_resource(DestinationID) -> {ok, DestinationMachine} = ff_destination_machine:get(DestinationID), diff --git a/compose.yaml b/compose.yaml index 951d661e..668c8b9f 100644 --- a/compose.yaml +++ b/compose.yaml @@ -52,7 +52,7 @@ services: retries: 10 limiter: - image: ghcr.io/valitydev/limiter:sha-920d6ac + image: ghcr.io/valitydev/limiter:sha-2271094 command: /opt/limiter/bin/limiter foreground depends_on: machinegun: