Skip to content

Commit

Permalink
test: add case for a few bridges
Browse files Browse the repository at this point in the history
  • Loading branch information
thalesmg committed Oct 14, 2024
1 parent 5358162 commit 3896cb9
Show file tree
Hide file tree
Showing 2 changed files with 136 additions and 21 deletions.
1 change: 1 addition & 0 deletions test/data/bridges1.json
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"version":"4.4","rules":[{"id":"rule:581902","rawsql":"SELECT\n\n *\n\nFROM\n\n \"t/redis\"\n","actions":[{"id":"data_to_redis_1728934295955661528","name":"data_to_redis","fallbacks":[],"args":{"cmd":"HSET msgs:${clientid} ${payload}","$resource":"resource:642569"}},{"id":"data_to_redis_1728934295955690913","name":"data_to_redis","fallbacks":[],"args":{"cmd":"HSET k v","$resource":"resource:719684"}},{"id":"data_to_redis_1728934295955714014","name":"data_to_redis","fallbacks":[],"args":{"cmd":"HSET k v","$resource":"resource:916990"}}],"enabled":false,"description":""},{"id":"rule:075972","rawsql":"SELECT\n\n *\n\nFROM\n\n \"t/debug\"\n","actions":[{"id":"inspect_1728928966144399180","name":"inspect","fallbacks":[],"args":{}}],"enabled":true,"description":""},{"id":"rule:856633","rawsql":"SELECT\n\n *\n\nFROM\n\n \"t/mqtt\"\n","actions":[{"id":"data_to_mqtt_broker_1728929031495770311","name":"data_to_mqtt_broker","fallbacks":[],"args":{"qos":"inherited_from_source_msg","payload_tmpl":"","forward_topic":"forward/t","$resource":"resource:624957"}}],"enabled":true,"description":""},{"id":"rule:886709","rawsql":"SELECT\n\n *\n\nFROM\n\n \"t/kafka\"\n","actions":[{"id":"data_to_kafka_1728928713702878955","name":"data_to_kafka","fallbacks":[],"args":{"type":"async","topic":"test-topic-one-partition","strategy":"random","segments_bytes":"100MB","required_acks":"all_isr","payload_tmpl":"","partition_count_refresh_interval":"60s","message_accumulation_size":0,"message_accumulation_keep_msg_order":true,"message_accumulation_interval":100,"message_accumulation_drop_factor":20,"max_total_bytes":"2GB","key":"none","kafka_header_value_encode_mode":"NONE","kafka_ext_headers":[],"highmem_drop":false,"cache_mode":"Memory","$resource":"resource:959424"}}],"enabled":false,"description":""},{"id":"rule:284238","rawsql":"SELECT\n\n *\n\nFROM\n\n \"t/republish\"\n","actions":[{"id":"republish_1728928923290011267","name":"republish","fallbacks":[],"args":{"user_properties_template":[],"target_topic":"repub/to/${clientid}","target_retain":false,"target_qos":0,"payload_tmpl":"${payload}","mqtt_properties_template":[]}}],"enabled":true,"description":""},{"id":"rule:434126","rawsql":"SELECT\n\n *\n\nFROM\n\n \"t/postgres\"\n","actions":[{"id":"data_to_pgsql_1728928784025005009","name":"data_to_pgsql","fallbacks":[],"args":{"sync_timeout":5000,"sql_insert_null":false,"sql":"insert into postgres_table(col) values (payload)","insert_mode":"async","enable_batch":true,"batch_time":10,"batch_size":100,"batch_pool_size":4,"$resource":"resource:085726"}}],"enabled":true,"description":""}],"resources":[{"id":"resource:916990","type":"backend_redis_cluster","config":{"cacertfile":{"filename":"","file":""},"certfile":{"filename":"","file":""},"database":0,"keyfile":{"filename":"","file":""},"password":"","pool_size":8,"servers":"redis:6379","ssl":true,"verify":false},"created_at":1728933059219,"description":""},{"id":"resource:085726","type":"backend_pgsql","config":{"auto_reconnect":true,"cacertfile":{"filename":"","file":""},"certfile":{"filename":"","file":""},"database":"postgres","keyfile":{"filename":"","file":""},"password":"postgres","pool_size":8,"server":"postgres:5432","ssl":false,"username":"postgres","verify":false},"created_at":1728928660046,"description":""},{"id":"resource:624957","type":"bridge_mqtt","config":{"address":"127.0.0.1:1883","append":true,"bridge_mode":false,"cacertfile":{"filename":"","file":""},"certfile":{"filename":"","file":""},"ciphers":"ECDHE-ECDSA-AES256-GCM-SHA384,ECDHE-RSA-AES256-GCM-SHA384,ECDHE-ECDSA-AES256-SHA384,ECDHE-RSA-AES256-SHA384,ECDHE-ECDSA-DES-CBC3-SHA,ECDH-ECDSA-AES256-GCM-SHA384,ECDH-RSA-AES256-GCM-SHA384,ECDH-ECDSA-AES256-SHA384,ECDH-RSA-AES256-SHA384,DHE-DSS-AES256-GCM-SHA384,DHE-DSS-AES256-SHA256,AES256-GCM-SHA384,AES256-SHA256,ECDHE-ECDSA-AES128-GCM-SHA256,ECDHE-RSA-AES128-GCM-SHA256,ECDHE-ECDSA-AES128-SHA256,ECDHE-RSA-AES128-SHA256,ECDH-ECDSA-AES128-GCM-SHA256,ECDH-RSA-AES128-GCM-SHA256,ECDH-ECDSA-AES128-SHA256,ECDH-RSA-AES128-SHA256,DHE-DSS-AES128-GCM-SHA256,DHE-DSS-AES128-SHA256,AES128-GCM-SHA256,AES128-SHA256,ECDHE-ECDSA-AES256-SHA,ECDHE-RSA-AES256-SHA,DHE-DSS-AES256-SHA,ECDH-ECDSA-AES256-SHA,ECDH-RSA-AES256-SHA,AES256-SHA,ECDHE-ECDSA-AES128-SHA,ECDHE-RSA-AES128-SHA,DHE-DSS-AES128-SHA,ECDH-ECDSA-AES128-SHA,ECDH-RSA-AES128-SHA,AES128-SHA","clientid":"mybridge","disk_cache":"off","keepalive":"60s","keyfile":{"filename":"","file":""},"mountpoint":"","password":"","pool_size":8,"proto_ver":"mqttv5","reconnect_interval":"30s","retry_interval":"20s","ssl":false,"username":"","verify":false},"created_at":1728929001020,"description":""},{"id":"resource:959424","type":"bridge_kafka","config":{"cacertfile":{"filename":"","file":""},"certfile":{"filename":"","file":""},"compression":"no_compression","keyfile":{"filename":"","file":""},"max_batch_bytes":"900KB","min_metadata_refresh_interval":"3s","query_api_versions":true,"send_buffer":"1024KB","server_name_indication":"auto","servers":"kafka-1.emqx.net:9092","ssl":false,"sync_timeout":"3s","tcp_keepalive":"0s","verify":false},"created_at":1728928633000,"description":""},{"id":"resource:642569","type":"backend_redis_single","config":{"auto_reconnect":true,"cacertfile":{"filename":"","file":""},"certfile":{"filename":"","file":""},"database":0,"keyfile":{"filename":"","file":""},"password":"","pool_size":8,"server":"redis:6379","ssl":false,"verify":false},"created_at":1728928675127,"description":""},{"id":"resource:719684","type":"backend_redis_sentinel","config":{"auto_reconnect":true,"cacertfile":{"filename":"","file":""},"certfile":{"filename":"","file":""},"database":0,"keyfile":{"filename":"","file":""},"password":"","pool_size":8,"sentinel":"mysentinel","servers":"redis:26379","ssl":false,"verify":false},"created_at":1728933029402,"description":""}],"blacklist":[{"who":{"username":"banned_username2"},"by":"user","reason":"banned2","at":1728916029,"until":1729134000}],"apps":[{"id":"admin","secret":"public","name":"Default","desc":"Application user","status":true,"expired":"undefined"},{"id":"app_id","secret":"4mVZvVT9CnC6Z3AYdk9C07Ecz9AuBCLblb43kk69BcxbBhP","name":"app_name","desc":"app comment","status":true,"expired":"undefined"}],"users":[{"username":"admin","password":"kY+vJrDS/DCvsAPMYG09iAWLdKY=","tags":{"role":"administrator","desc":"admin"}},{"username":"dash_user","password":"G9wNtibQhQoBqfbfuXGADBhZQgE=","tags":{"role":"administrator","desc":"dash comment"}}],"auth_mnesia":[{"login":"authn_cid1","type":"clientid","password":"6v6dezYyY2EzNGQzYWYyZGQ1NjQyYjg4ZjRlMzQ3OGYzY2IzZDU4Y2I2NmI3ZTg1NGI3NjcyNmQ0ODE4ZTE3YTNhNmM=","created_at":1728915837583}],"acl_mnesia":[{"type":"clientid","type_value":"authz_cid1","topic":"t/1","action":"pub","access":"allow","created_at":1728915852861},{"type":"clientid","type_value":"authz_cid1","topic":"t/1","action":"sub","access":"allow","created_at":1728915852861}],"modules":[{"id":"module:internal_acl","type":"internal_acl","config":{"acl_rule_file":{"filename":"acl.conf","file":"%%--------------------------------------------------------------------\n%% [ACL](https://docs.emqx.io/broker/v3/en/config.html)\n%%\n%% -type(who() :: all | binary() |\n%% {ipaddr, esockd_access:cidr()} |\n%% {ipaddrs, [esockd_access:cidr()]} |\n%% {client, binary()} |\n%% {user, binary()}).\n%%\n%% -type(access() :: subscribe | publish | pubsub).\n%%\n%% -type(topic() :: binary()).\n%%\n%% -type(rule() :: {allow, all} |\n%% {allow, who(), access(), list(topic())} |\n%% {deny, all} |\n%% {deny, who(), access(), list(topic())}).\n%%--------------------------------------------------------------------\n\n{allow, {user, \"dashboard\"}, subscribe, [\"$SYS/#\"]}.\n\n{allow, {ipaddr, \"127.0.0.1\"}, pubsub, [\"$SYS/#\", \"#\"]}.\n\n{deny, all, subscribe, [\"$SYS/#\", {eq, \"#\"}, {eq, \"+/#\"}]}.\n\n{allow, all}.\n\n"}},"enabled":true,"created_at":{"updated_at_ns":1728915241386230200,"created_at_ns":1728915241386230200},"description":""},{"id":"module:presence","type":"presence","config":{"qos":0},"enabled":true,"created_at":{"updated_at_ns":1728915241386260349,"created_at_ns":1728915241386260349},"description":""},{"id":"module:recon","type":"recon","config":{},"enabled":true,"created_at":{"updated_at_ns":1728915241386298494,"created_at_ns":1728915241386298494},"description":""},{"id":"module:mnesia_authentication","type":"mnesia_authentication","config":{"password_hash":"sha256","max_acls_for_each_login":0},"enabled":true,"created_at":{"updated_at_ns":1728915819392200802,"created_at_ns":1728915819392200802},"description":""},{"id":"module:redis_authentication","type":"redis_authentication","config":{"verify":false,"type":"single","super_cmd":"HGET mqtt_user:%u is_superuser","ssl":false,"server":"redis:6379","sentinel":"","query_timeout":"5s","pool_size":8,"password_hash":"plain","password":"","keyfile":{"filename":"","file":""},"database":0,"certfile":{"filename":"","file":""},"cacertfile":{"filename":"","file":""},"auto_reconnect":true,"auth_cmd":"HMGET mqtt_user:%u password","acl_cmd":"HGETALL mqtt_acl:%u"},"enabled":true,"created_at":{"updated_at_ns":1728916225117110350,"created_at_ns":1728916225117110350},"description":""},{"id":"module:pgsql_authentication","type":"pgsql_authentication","config":{"verify":false,"user":"postgres","super_query":"select is_superuser from mqtt_user where username = '%u' limit 1","ssl":false,"server":"postgres:5432","query_timeout":"5s","pool_size":8,"password_hash":"sha256","password":"postgres","keyfile":{"filename":"","file":""},"database":"postgres","certfile":{"filename":"","file":""},"cacertfile":{"filename":"","file":""},"auto_reconnect":true,"auth_query":"select password from mqtt_user where username = '%u' limit 1","acl_query":"select allow, ipaddr, username, clientid, access, topic from mqtt_acl where ipaddr = '%a' or username = '%u' or username = '$all' or clientid = '%c'"},"enabled":true,"created_at":{"updated_at_ns":1728916353614061576,"created_at_ns":1728916353614061576},"description":""},{"id":"module:retainer","type":"retainer","config":{"storage_type":"ram","max_retained_messages":0,"max_payload_size":"1MB","expiry_interval":0},"enabled":true,"created_at":{"updated_at_ns":1728916693575495289,"created_at_ns":1728915241386322176},"description":""}],"schemas":[],"configs":[],"listeners_state":[],"date":"2024-10-14 19:31:44"}
156 changes: 135 additions & 21 deletions test/scripts/test-convert-and-load.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ defmodule TH do
require Logger

@container_name "emqx-data-converter"
# exposed by docker
@api_port 48083

def converter_path() do
Path.absname("./emqx_data_converter")
Expand Down Expand Up @@ -92,7 +94,14 @@ defmodule TH do
# stupid and ugly hack because, for unknown reasons, `emqx start` hangs for ~ 62
# seconds when running before starting locally, but runs fine in CI... 🫠
spawn_link(fn ->
run_in_container(cmd)
case run_in_container(cmd, stderr_to_stdout: true) do
{:ok, _} ->
:ok

{:error, exit_code, output} ->
IO.puts(output)
exit({:failed_to_start_emqx, exit_code, output})
end
end)

Enum.reduce_while(1..wait_s, :error, fn _, acc ->
Expand Down Expand Up @@ -200,11 +209,68 @@ defmodule TH do
{:ok, out}
end
end

def kw_update_some(kw, key, fun) do
if Keyword.has_key?(kw, key) do
Keyword.update!(kw, key, fun)
else
kw
end
end

def parse_exunit_opts(argv) do
{opts, _pos_args} = OptionParser.parse!(argv, strict: [only: :keep])

opts
|> parse_only()
end

defp parse_only(opts) do
if filters = parse_filters(opts, :only) do
opts
|> Keyword.update(:include, filters, &(filters ++ &1))
|> Keyword.update(:exclude, [:test], &[:test | &1])
else
opts
end
end

defp parse_filters(opts, key) do
if Keyword.has_key?(opts, key) do
opts
|> Keyword.get_values(key)
|> ExUnit.Filters.parse()
end
end

def api_req!(method, path, body \\ "", opts \\ []) do

Check warning on line 246 in test/scripts/test-convert-and-load.exs

View workflow job for this annotation

GitHub Actions / integration_test / integration_tests

variable "opts" is unused (if the variable is not meant to be used, prefix it with an underscore)
api_user = "app_id"
api_pass = "4mVZvVT9CnC6Z3AYdk9C07Ecz9AuBCLblb43kk69BcxbBhP"
authn64 = Base.encode64("#{api_user}:#{api_pass}")

HTTPoison.request!(
method,
"http://localhost:#{@api_port}/api/v5/#{path}",
body,
[{"Authorization", "Basic #{authn64}"}]
)
|> Map.update!(:body, fn body ->
case Jason.decode(body) do
{:ok, json} ->
json

_ ->
body
end
end)
end
end

opts = System.argv() |> TH.parse_exunit_opts()

Mix.install([{:httpoison, "2.2.1"}, {:jason, "1.4.4"}])

ExUnit.start()
ExUnit.start(opts)

defmodule Tests do
use ExUnit.Case
Expand All @@ -228,22 +294,6 @@ defmodule Tests do
TH.import_table("emqx_app", outdir)
"""

# exposed by docker
@api_port 48083

def api_req!(method, path, body \\ "") do
api_user = "app_id"
api_pass = "4mVZvVT9CnC6Z3AYdk9C07Ecz9AuBCLblb43kk69BcxbBhP"
authn64 = Base.encode64("#{api_user}:#{api_pass}")

HTTPoison.request!(
method,
"http://localhost:#{@api_port}/api/v5/#{path}",
body,
[{"Authorization", "Basic #{authn64}"}]
)
end

setup_all do
{:ok, %{image: System.fetch_env!("EMQX_IMAGE")}}
end
Expand All @@ -269,7 +319,7 @@ defmodule Tests do
:ok = TH.import!(converted_path)

# import application should start working
resp = api_req!(:get, "mqtt/retainer/messages")
resp = TH.api_req!(:get, "mqtt/retainer/messages")

assert %HTTPoison.Response{status_code: 200} = resp
end
Expand All @@ -286,8 +336,72 @@ defmodule Tests do
resp = api_req!(:get, "mqtt/retainer/messages")

assert %HTTPoison.Response{status_code: 200} = resp
messages = Jason.decode!(resp.body)

assert %{"data" => [_, _, _, _]} = messages
assert %{"data" => [_, _, _, _]} = resp.body
end

# A few rules that send data to bridges.
# - kafka
# - mqtt
# - postgres
# - redis
# - republish
# - debug (inspect)
@tag :bridges
test "bridges 1" do
path = "test/data/bridges1.json"
{:ok, converted_path} = TH.convert!(path)
on_exit(fn -> File.rm(converted_path) end)
:ok = TH.import!(converted_path)

connectors =
TH.api_req!(:get, "connectors")
|> Map.fetch!(:body)

actions =
TH.api_req!(:get, "actions")
|> Map.fetch!(:body)

expected_connector_types =
MapSet.new([
"kafka_producer",
"mqtt",
"pgsql",
"redis"
])

assert connectors |> Enum.map(& &1["type"]) |> Enum.into(MapSet.new()) ==
expected_connector_types

# 1 kafka, 1 mqtt, 1 postgres, 3 redis
assert length(connectors) == 6

redis_types =
connectors
|> Enum.filter(&(&1["type"] == "redis"))
|> Enum.map(&get_in(&1, ["parameters", "redis_type"]))
|> MapSet.new()

assert redis_types == MapSet.new(["single", "cluster", "sentinel"])

expected_action_types =
MapSet.new([
"kafka_producer",
"mqtt",
"pgsql",
"redis"
])

assert actions |> Enum.map(& &1["type"]) |> Enum.into(MapSet.new()) == expected_action_types
# 1 kafka, 1 mqtt, 1 postgres, 3 redis
assert length(actions) == 6

redis_types =
actons
|> Enum.filter(&(&1["type"] == "redis"))
|> Enum.map(&get_in(&1, ["parameters", "redis_type"]))
|> MapSet.new()

assert redis_types == MapSet.new(["single", "cluster", "sentinel"])
end
end

0 comments on commit 3896cb9

Please sign in to comment.