#!benchDL

#######
# 1_to_1 Scenario:
# 1000 clients subscribe to an exclusive topic: "channels/{channel_id}/messages"
# The same 1000 clients send messages on that topic to themselves
# Overall Msg rate: 1k msg/s
# Message Size: 100 random bytes
# Runtime: 5 min
# QoS level: 2
# NOTE: JSON array of Mainfux pre-provisioned things' IDs, things' keys
# and channels' IDs must be in the same directory with scenario in files:
# things_id.json, things_key.json and channels_id.json
# Scenario's values can be changed with environment variables predefined in defaults
# Set environment variables to connect to Mainflux instance:
# MF_MZBENCH_MQTT_ENPOINT = IP/domain of MQTT endpoint on Mainflux
# MF_MZBENCH_MQTT_PORT = Port of MQTT endpoint on Mainflux
#######

include_resource(things_id, "things_id.json", json)
include_resource(things_key, "things_key.json", json)
include_resource(channels_id, "channels_id.json", json)

defaults(
        "MF_MZBENCH_PUB_NUM" = 1000,
        "MF_MZBENCH_PUB_TIME" = 5,
        "MF_MZBENCH_PUB_RATE" = 1,
        "MF_MZBENCH_MSG_SIZE" = 100,
        "MF_MZBENCH_QOS" = 2,
        "MF_MZBENCH_MQTT_ENDPOINT" = "127.0.0.1",
        "MF_MZBENCH_MQTT_PORT" = 1883
)

make_install(git = "https://github.com/erlio/vmq_mzbench.git",
             branch = "master")

pool(size =  numvar("MF_MZBENCH_PUB_NUM"),
     worker_type = mqtt_worker,
     worker_start = poisson(numvar("MF_MZBENCH_PUB_NUM") rps)):
            connect([t(host, var("MF_MZBENCH_MQTT_ENDPOINT")),
                    t(port, numvar("MF_MZBENCH_MQTT_PORT")),
                    t(client,fixed_client_id("pool1", worker_id())),
                    t(clean_session,true),
                    t(keepalive_interval,60),
                    t(username, round_robin(resource(things_id))),
                    t(password, round_robin(resource(things_key))),
                    t(proto_version,4), t(reconnect_timeout,4)
                    ])

            wait(1 sec)
            subscribe(sprintf("channels/~s/messages", [round_robin(resource(channels_id))]), numvar("MF_MZBENCH_QOS"))
            set_signal(pool1, 1)
            wait_signal(pool1, numvar("MF_MZBENCH_PUB_NUM"))
            loop(time = numvar("MF_MZBENCH_PUB_TIME") min, rate = numvar("MF_MZBENCH_PUB_RATE") rps):
                publish(sprintf("channels/~s/messages", [round_robin(resource(channels_id))]), random_binary(numvar("MF_MZBENCH_MSG_SIZE")), numvar("MF_MZBENCH_QOS"))
            disconnect()