From 4f486a9d51bada3a1403273b1b983f783392ff53 Mon Sep 17 00:00:00 2001 From: Thales Macedo Garitezi Date: Thu, 6 Jun 2024 16:53:53 -0300 Subject: [PATCH] chore: refactor azure blob connector to use new `erlazure` without `gen_server` Depends on https://github.com/dkataskin/erlazure/pull/43 being merged and then synced to our fork. --- ...qx_bridge_azure_blob_storage_connector.erl | 191 ++++++------------ .../emqx_bridge_azure_blob_storage_SUITE.erl | 11 +- 2 files changed, 63 insertions(+), 139 deletions(-) diff --git a/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl b/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl index cdd25ad7d62..c0f4a7e6955 100644 --- a/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl +++ b/apps/emqx_bridge_azure_blob_storage/src/emqx_bridge_azure_blob_storage_connector.erl @@ -32,18 +32,6 @@ on_batch_query/3 ]). -%% `ecpool_worker' API --export([ - connect/1, - do_create_append_blob/3, - do_create_block_blob/3, - do_append_data/5, - do_put_block_list/4, - do_put_block_blob/4, - do_health_check/1, - do_list_blobs/2 -]). - %% `emqx_connector_aggreg_delivery' API -export([ init_transfer_state/2, @@ -71,7 +59,7 @@ }. -type connector_state() :: #{ - pool_name := connector_resource_id(), + driver_state := driver_state(), installed_actions := #{action_resource_id() => action_state()} }. @@ -124,7 +112,7 @@ -type query() :: {_Tag :: channel_id(), _Data :: emqx_jsonish:t()}. --type pool_name() :: connector_resource_id(). +-type driver_state() :: _. -type transfer_opts() :: #{ upload_options := #{ @@ -133,7 +121,7 @@ container := string(), min_block_size := pos_integer(), max_block_size := pos_integer(), - pool := connector_resource_id() + driver_state := driver_state() } }. @@ -148,7 +136,7 @@ min_block_size := pos_integer(), next_block := queue:queue(iolist()), num_blocks := non_neg_integer(), - pool := pool_name(), + driver_state := driver_state(), started := boolean() }. @@ -162,38 +150,32 @@ callback_mode() -> -spec on_start(connector_resource_id(), connector_config()) -> {ok, connector_state()} | {error, _Reason}. -on_start(ConnResId, ConnConfig) -> +on_start(_ConnResId, ConnConfig) -> #{ account_name := AccountName, account_key := AccountKey } = ConnConfig, Endpoint = maps:get(endpoint, ConnConfig, undefined), - ClientOpts = [ - {account_name, AccountName}, - {account_key, AccountKey}, - {endpoint, Endpoint} - ], - case emqx_resource_pool:start(ConnResId, ?MODULE, ClientOpts) of - ok -> - State = #{ - pool_name => ConnResId, - installed_actions => #{} - }, - {ok, State}; - {error, Reason} -> - {error, Reason} - end. + {ok, DriverState} = erlazure:new(#{ + account => AccountName, + key => AccountKey, + endpoint => Endpoint + }), + State = #{ + driver_state => DriverState, + installed_actions => #{} + }, + {ok, State}. -spec on_stop(connector_resource_id(), connector_state()) -> ok. -on_stop(ConnResId, _ConnState) -> - Res = emqx_resource_pool:stop(ConnResId), - ?tp(azure_blob_storage_stop, #{instance_id => ConnResId}), - Res. +on_stop(_ConnResId, _ConnState) -> + ?tp(azure_blob_storage_stop, #{instance_id => _ConnResId}), + ok. -spec on_get_status(connector_resource_id(), connector_state()) -> ?status_connected | ?status_disconnected. -on_get_status(ConnResId, _ConnState) -> - health_check(ConnResId). +on_get_status(_ConnResId, _ConnState = #{driver_state := DriverState}) -> + health_check(DriverState). -spec on_add_channel( connector_resource_id(), @@ -236,22 +218,22 @@ on_get_channels(ConnResId) -> ) -> ?status_connected | ?status_disconnected. on_get_channel_status( - ConnResId, + _ConnResId, ActionResId, - _ConnectorState = #{installed_actions := InstalledActions} + ConnectorState = #{installed_actions := InstalledActions} ) when is_map_key(ActionResId, InstalledActions) -> #{ActionResId := ActionConfig} = InstalledActions, - channel_status(ActionConfig, ConnResId); + channel_status(ActionConfig, ConnectorState); on_get_channel_status(_ConnResId, _ActionResId, _ConnState) -> ?status_disconnected. -spec on_query(connector_resource_id(), query(), connector_state()) -> {ok, _Result} | {error, _Reason}. -on_query(ConnResId, {Tag, Data}, #{installed_actions := InstalledActions}) -> +on_query(ConnResId, {Tag, Data}, #{installed_actions := InstalledActions} = ConnState) -> case maps:get(Tag, InstalledActions, undefined) of ChannelState = #{mode := direct} -> ?tp(azure_blob_storage_bridge_on_query_enter, #{mode => direct}), - run_direct_transfer(Data, ConnResId, Tag, ChannelState); + run_direct_transfer(Data, ConnResId, Tag, ChannelState, ConnState); ChannelState = #{mode := aggregated} -> ?tp(azure_blob_storage_bridge_on_query_enter, #{mode => aggregated}), run_aggregated_transfer([Data], ChannelState); @@ -271,67 +253,27 @@ on_batch_query(_ConnResId, [{Tag, Data0} | Rest], #{installed_actions := Install end. %%------------------------------------------------------------------------------ -%% `ecpool_worker' API +%% Driver calls %%------------------------------------------------------------------------------ -connect(Opts0) -> - #{ - account_name := AccountName, - account_key := AccountKey, - endpoint := Endpoint - } = maps:from_list(Opts0), - erlazure:start(#{account => AccountName, key => AccountKey, endpoint => Endpoint}). - -do_create_append_blob(Worker, Container, Blob) -> - %% TODO: check container type before setting content type - Opts = [{content_type, "text/csv"}], - erlazure:put_append_blob(Worker, Container, Blob, Opts, infinity). - -create_block_blob(Pool, Container, Blob) -> - ecpool:pick_and_do(Pool, {?MODULE, do_create_block_blob, [Container, Blob]}, no_handover). - -do_create_block_blob(Worker, Container, Blob) -> +do_create_block_blob(DriverState, Container, Blob) -> %% TODO: check container type before setting content type Opts = [{content_type, "text/csv"}], - erlazure:put_block_blob(Worker, Container, Blob, <<>>, Opts, infinity). - -append_data(Pool, Container, Blob, BlockId, IOData) -> - ecpool:pick_and_do( - Pool, {?MODULE, do_append_data, [Container, Blob, BlockId, IOData]}, no_handover - ). + erlazure:put_block_blob(DriverState, Container, Blob, <<>>, Opts). -do_append_data(Worker, Container, Blob, BlockId, IOData) -> - erlazure:put_block(Worker, Container, Blob, BlockId, IOData, [], infinity). +do_append_data(DriverState, Container, Blob, BlockId, IOData) -> + erlazure:put_block(DriverState, Container, Blob, BlockId, IOData, []). -put_block_list(Pool, Container, Blob, BlockRefs) -> - ecpool:pick_and_do( - Pool, {?MODULE, do_put_block_list, [Container, Blob, BlockRefs]}, no_handover - ). - -do_put_block_list(Worker, Container, Blob, BlockRefs) -> +do_put_block_list(DriverState, Container, Blob, BlockRefs) -> %% TODO: check container type before setting content type Opts = [{req_opts, [{headers, [{"x-ms-blob-content-type", "text/csv"}]}]}], - erlazure:put_block_list(Worker, Container, Blob, BlockRefs, Opts, infinity). - -put_block_blob(Pool, Container, Blob, IOData) -> - ecpool:pick_and_do(Pool, {?MODULE, do_put_block_blob, [Container, Blob, IOData]}, no_handover). - -do_put_block_blob(Worker, Container, Blob, IOData) -> - erlazure:put_block_blob(Worker, Container, Blob, IOData, [], infinity). - -do_health_check(Worker) -> - case erlazure:list_containers(Worker, [], infinity) of - {error, _} -> - error; - {L, _} when is_list(L) -> - ok - end. + erlazure:put_block_list(DriverState, Container, Blob, BlockRefs, Opts). -list_blobs(Pool, Container) -> - ecpool:pick_and_do(Pool, {?MODULE, do_list_blobs, [Container]}, no_handover). +do_put_block_blob(DriverState, Container, Blob, IOData) -> + erlazure:put_block_blob(DriverState, Container, Blob, IOData, []). -do_list_blobs(Worker, Container) -> - case erlazure:list_blobs(Worker, Container, [], infinity) of +do_list_blobs(DriverState, Container) -> + case erlazure:list_blobs(DriverState, Container, []) of {error, _} -> error; {L, _} when is_list(L) -> @@ -352,7 +294,7 @@ init_transfer_state(Buffer, Opts) -> container := Container, max_block_size := MaxBlockSize, min_block_size := MinBlockSize, - pool := Pool + driver_state := DriverState } } = Opts, Blob = mk_blob_name_key(Buffer, ActionName, BlobTemplate), @@ -365,7 +307,7 @@ init_transfer_state(Buffer, Opts) -> min_block_size => MinBlockSize, next_block => queue:new(), num_blocks => 0, - pool => Pool, + driver_state => DriverState, started => false }. @@ -401,14 +343,14 @@ process_append(IOData, TransferState0) -> {ok, transfer_state()} | {error, term()}. process_write(TransferState0 = #{started := false}) -> #{ - pool := Pool, + driver_state := DriverState, blob := Blob, container := Container } = TransferState0, %% TODO %% Possible optimization: if the whole buffer fits the 5000 MiB `put_block_blob' %% limit, we could upload the whole thing here. - case create_block_blob(Pool, Container, Blob) of + case do_create_block_blob(DriverState, Container, Blob) of {ok, _} -> TransferState = TransferState0#{started := true}, process_write(TransferState); @@ -432,9 +374,9 @@ do_process_write(IOData, TransferState0 = #{started := true}) -> blob := Blob, container := Container, num_blocks := NumBlocks, - pool := Pool + driver_state := DriverState } = TransferState0, - case append_data(Pool, Container, Blob, block_id(NumBlocks), IOData) of + case do_append_data(DriverState, Container, Blob, block_id(NumBlocks), IOData) of {ok, _} -> TransferState = TransferState0#{num_blocks := NumBlocks + 1}, process_write(TransferState); @@ -451,7 +393,7 @@ process_complete(TransferState) -> buffer_size := BufferSize, container := Container, num_blocks := NumBlocks0, - pool := Pool + driver_state := DriverState } = TransferState, %% Flush any left-over data NumBlocks = @@ -463,7 +405,7 @@ process_complete(TransferState) -> NumBlocks0 end, BlockRefs = [{block_id(N), latest} || N <- lists:seq(0, NumBlocks - 1)], - case put_block_list(Pool, Container, Blob, BlockRefs) of + case do_put_block_list(DriverState, Container, Blob, BlockRefs) of {ok, _} -> {ok, #{num_blocks => NumBlocks}}; {error, Reason} -> @@ -524,7 +466,7 @@ install_action(#{parameters := #{mode := direct}} = ActionConfig, _ConnState) -> max_block_size => MaxBlockSize }; install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState) -> - #{pool_name := Pool} = ConnState, + #{driver_state := DriverState} = ConnState, #{ bridge_name := Name, parameters := #{ @@ -554,7 +496,7 @@ install_action(#{parameters := #{mode := aggregated}} = ActionConfig, ConnState) container => ContainerName, max_block_size => MaxBlockSize, min_block_size => MinBlockSize, - pool => Pool + driver_state => DriverState }, DeliveryOpts = #{ callback_module => ?MODULE, @@ -584,7 +526,8 @@ stop_action(#{on_stop := {M, F, A}}) -> stop_action(_) -> ok. -run_direct_transfer(Data, ConnResId, ActionResId, ActionState) -> +run_direct_transfer(Data, ConnResId, ActionResId, ActionState, ConnState) -> + #{driver_state := DriverState} = ConnState, #{ container := ContainerTemplate, blob := BlobTemplate, @@ -608,7 +551,7 @@ run_direct_transfer(Data, ConnResId, ActionResId, ActionState) -> false -> ok end, - case put_block_blob(ConnResId, Container, Blob, Content) of + case do_put_block_blob(DriverState, Container, Blob, Content) of {ok, created} -> ?tp(azure_blob_storage_bridge_connector_upload_ok, #{instance_id => ConnResId}), ok; @@ -681,40 +624,26 @@ render_content(Template, Data) -> iolist_to_string(IOList) -> unicode:characters_to_list(IOList). -channel_status(#{mode := direct}, _ConnResId) -> +channel_status(#{mode := direct}, _ConnState) -> %% There's nothing in particular to check for in this mode; the connector health check %% already verifies that we're able to use the client to list containers. ?status_connected; -channel_status(#{mode := aggregated} = ActionState, ConnResId) -> +channel_status(#{mode := aggregated} = ActionState, ConnState) -> + #{driver_state := DriverState} = ConnState, #{container := Container, aggreg_id := AggregId} = ActionState, %% NOTE: This will effectively trigger uploads of buffers yet to be uploaded. Timestamp = erlang:system_time(second), ok = emqx_connector_aggregator:tick(AggregId, Timestamp), - ok = check_container_accessible(ConnResId, Container), + ok = check_container_accessible(DriverState, Container), ok = check_aggreg_upload_errors(AggregId), ?status_connected. -health_check(ConnResId) -> - case - emqx_resource_pool:health_check_workers( - ConnResId, - fun ?MODULE:do_health_check/1, - emqx_resource_pool:health_check_timeout(), - #{return_values => true} - ) - of - {ok, []} -> - ?status_disconnected; - {ok, Values} -> - AllOk = lists:all(fun(S) -> S =:= ok end, Values), - case AllOk of - true -> - ?status_connected; - false -> - ?status_disconnected - end; +health_check(DriverState) -> + case erlazure:list_containers(DriverState, []) of {error, _} -> - ?status_disconnected + ?status_disconnected; + {L, _} when is_list(L) -> + ?status_connected end. map_error({failed_connect, _} = Reason) -> @@ -734,8 +663,8 @@ check_aggreg_upload_errors(AggregId) -> ok end. -check_container_accessible(Pool, Container) -> - list_blobs(Pool, Container). +check_container_accessible(DriverState, Container) -> + do_list_blobs(DriverState, Container). block_id(N) -> NumDigits = 32, diff --git a/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl b/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl index 9e165c8e889..577fa502534 100644 --- a/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl +++ b/apps/emqx_bridge_azure_blob_storage/test/emqx_bridge_azure_blob_storage_SUITE.erl @@ -105,7 +105,7 @@ init_per_testcase(TestCase, Config0) -> parameters => #{container => ContainerName} }) end, - Client = start_control_client(Endpoint), + Client = new_control_driver(Endpoint), ct:pal("container name: ~s", [ContainerName]), ok = ensure_new_container(ContainerName, Client), Config = [ @@ -123,14 +123,12 @@ init_per_testcase(TestCase, Config0) -> Config. end_per_testcase(_Testcase, Config) -> - Client = ?config(client, Config), ProxyHost = ?config(proxy_host, Config), ProxyPort = ?config(proxy_port, Config), emqx_common_test_helpers:reset_proxy(ProxyHost, ProxyPort), emqx_bridge_v2_testlib:delete_all_bridges_and_connectors(), emqx_common_test_helpers:call_janitor(), ok = snabbkaffe:stop(), - stop_control_client(Client), ok. direct_action_cases() -> @@ -144,17 +142,14 @@ direct_action_cases() -> %% Helper fns %%------------------------------------------------------------------------------ -start_control_client(Endpoint) -> - {ok, Client} = erlazure:start(#{ +new_control_driver(Endpoint) -> + {ok, Client} = erlazure:new(#{ endpoint => Endpoint, account => binary_to_list(?ACCOUNT_NAME_BIN), key => binary_to_list(?ACCOUNT_KEY_BIN) }), Client. -stop_control_client(Client) -> - gen_server:stop(Client). - container_name(Name) -> IOList = re:replace(bin(Name), <<"[^a-z0-9-]">>, <<"-">>, [global]), iolist_to_binary(IOList).