Skip to content

Commit

Permalink
Refactor MQTT handling
Browse files Browse the repository at this point in the history
Remove feature guards.
Add function for single message publish.
Getters and setters.
Reset function.
Init mqtt_t on publish if mqtt_t is NULL.
  • Loading branch information
ArnoStiefvater committed Jul 13, 2021
1 parent 974e469 commit 93392e4
Show file tree
Hide file tree
Showing 2 changed files with 257 additions and 54 deletions.
299 changes: 252 additions & 47 deletions util/mqtt.c
Original file line number Diff line number Diff line change
Expand Up @@ -19,48 +19,119 @@

#include "mqtt.h"

/** TODO: Remove dependency ../base/prefs.h **/
#include "../base/prefs.h"
#include "uuidutils.h" /* gvm_uuid_make */

#include <glib.h>

#undef G_LOG_DOMAIN
#define G_LOG_DOMAIN "lib mqtt"

#define QOS 1
#define TIMEOUT 10000L

/**
* @brief Check for MQTT support
* @brief Get server uri as specified in the openvas conf file.
*
* @return 1 if gvm-libs has been built with mqtt, 0 otherwise.
* @return Server URI, NULL if not found.
*/
int
gvm_has_mqtt_support ()
static const char *
mqtt_get_server_uri ()
{
return prefs_get ("mqtt_server_uri");
}

/**
* @brief Disconnect from the Broker.
*
* @param mqtt mqtt_t
*
* @return 0 on success, -1 on error.
*/
static int
mqtt_disconnect (mqtt_t *mqtt)
{
#ifdef HAVE_MQTT
return 1;
#endif /* HAVE_MQTT */
int rc;

rc = MQTTClient_disconnect5 (mqtt->client, 200,
MQTTREASONCODE_NORMAL_DISCONNECTION, NULL);
if (rc != MQTTCLIENT_SUCCESS)
{
g_warning ("Failed to disconnect: %s", MQTTClient_strerror (rc));
return -1;
}

return 0;
}

#ifdef HAVE_MQTT
/**
* Create a new mqtt client.
* @brief Destroy the mqtt client inside mqtt_t struct
*
* @param server_uri URI of server.
* @param id Client id.
* @param[in] mqtt mqtt_t handle.
*
* @return mqtt client
*/
static void
mqtt_client_destroy (mqtt_t *mqtt)
{
MQTTClient client;
client = (MQTTClient) mqtt->client;

if (client != NULL)
{
MQTTClient_destroy (client);
client = NULL;
}

return;
}
/**
* @brief Destroy the mqtt_t data.
*
* @param mqtt mqtt_t
*/
static void
mqtt_client_data_destroy (mqtt_t *mqtt)
{
g_free (mqtt->addr);
g_free (mqtt->client_id);
g_free (mqtt);
mqtt = NULL;
}

/**
* @brief Destroy mqtt handle and mqtt_t.
*
* @param mqtt mqtt_t
*/
void
mqtt_reset (mqtt_t *mqtt)
{
mqtt_client_destroy (mqtt);
mqtt_client_data_destroy (mqtt);
return;
}

/**
* @brief Create a new mqtt client.
*
* @param mqtt mqtt_t
*
* @return mqtt client or NULL on error.
*/
static MQTTClient
mqtt_create (const char *server_uri, char *id)
mqtt_create (mqtt_t *mqtt)
{
MQTTClient client;
MQTTClient_createOptions create_opts = MQTTClient_createOptions_initializer;
create_opts.MQTTVersion = MQTTVERSION_5;

int rc = MQTTClient_createWithOptions (
&client, server_uri, id, MQTTCLIENT_PERSISTENCE_NONE, NULL, &create_opts);
if (mqtt == NULL)
return NULL;
if (mqtt->addr == NULL || mqtt->client_id == NULL)
return NULL;

int rc = MQTTClient_createWithOptions (&client, mqtt->addr, mqtt->client_id,
MQTTCLIENT_PERSISTENCE_NONE, NULL,
&create_opts);

if (rc != MQTTCLIENT_SUCCESS)
{
Expand All @@ -69,29 +140,115 @@ mqtt_create (const char *server_uri, char *id)
}
return client;
}
#endif /* HAVE_MQTT */

/**
* @brief connect to a mqtt broker.
* @brief Set a random client ID.
*
* @param server_uri Address of the broker.
* @param mqtt mqtt_t
*
* @return Mqtt handle, NULL on error.
* @return Client ID which was set, NULL on failure.
*/
mqtt_t *
mqtt_connect (const char *server_uri)
char *
mqtt_set_client_id (mqtt_t *mqtt)
{
#ifdef HAVE_MQTT
if (mqtt == NULL)
return NULL;

char *uuid;

uuid = gvm_uuid_make ();
mqtt->client_id = uuid;

return uuid;
}

/**
* @brief Set Server Addr.
*
* @param mqtt mqtt_T
* @param server_uri URI of server. E.g "tcp://127.0.0.1:1883"
*
* @return 0 on success, NULL on error.
*/
static void
mqtt_set_server_addr (mqtt_t *mqtt, const char *server_uri)
{
if (mqtt == NULL)
{
g_warning ("%s:Can not set server addr on unitialized mqtt handle.",
__func__);
return;
}

mqtt->addr = g_strdup (server_uri);
}

/**
* @brief Set client handle
*
* @param mqtt mqtt_t
* @param client Client to set
*
*/
static void
mqtt_set_client (mqtt_t *mqtt, MQTTClient client)
{
if (mqtt == NULL)
{
g_warning ("%s: Can not set clien on uninitialized mqtt handle.",
__func__);
return;
}
mqtt->client = client;
return;
}

/**
* @brief Init mqtt_t
*
* @param server_uri Server URI
*
* @return New mqtt_t, NULL on error
*/
mqtt_t *
mqtt_init (const char *server_uri)
{
mqtt_t *mqtt = NULL;

mqtt = g_malloc0 (sizeof (mqtt));

// Set random uuid as client id
if (mqtt_set_client_id (mqtt) == NULL)
{
g_warning ("%s: Could not set client id.", __func__);
g_free (mqtt);
return NULL;
}
mqtt_set_server_addr (mqtt, server_uri);
mqtt_set_client (mqtt, NULL);

return mqtt;
}

/**
* @brief Make new client and connect to mqtt broker.
*
* @param mqtt Initialized mqtt_t
*
* @return mqtt_t handle, NULL on error.
*/
static mqtt_t *
mqtt_connect (mqtt_t *mqtt)
{
int rc;
MQTTClient client;
mqtt_t *mqtt = NULL;
MQTTClient_connectOptions conn_opts = MQTTClient_connectOptions_initializer5;
MQTTProperties connect_properties = MQTTProperties_initializer;
MQTTResponse resp;

uuid = gvm_uuid_make ();
client = mqtt_create (server_uri, uuid);
if (mqtt == NULL)
return NULL;
client = mqtt_create (mqtt);
if (!client)
return NULL;

Expand All @@ -105,22 +262,15 @@ mqtt_connect (const char *server_uri)
if (rc != MQTTCLIENT_SUCCESS)
{
g_log (G_LOG_DOMAIN, G_LOG_LEVEL_CRITICAL,
"%s: mqtt connection error to %s: %s", __func__, server_uri,
"%s: mqtt connection error to %s: %s", __func__, mqtt->addr,
MQTTClient_strerror (rc));
MQTTResponse_free (resp);
return NULL;
}

mqtt = g_malloc0 (sizeof (mqtt));
mqtt->client = client;
mqtt->addr = g_strdup (server_uri);
mqtt->client_id = uuid;
mqtt_set_client (mqtt, client);

return mqtt;
#else
(void) server_uri;
return NULL;
#endif /* HAVE_MQTT */
}

/**
Expand All @@ -130,21 +280,36 @@ mqtt_connect (const char *server_uri)
* @param topic Topic to publish on.
* @param msg Message to publish on queue.
*
* @return 0 on success, negative errorcode on failure.
* @return 0 on success, <0 on failure.
*/
int
mqtt_publish (mqtt_t *mqtt, const char *topic, const char *msg)
{
#ifdef HAVE_MQTT
MQTTClient client;
MQTTClient_message pubmsg = MQTTClient_message_initializer;
MQTTClient_deliveryToken token;
MQTTResponse resp;
int rc;
const char *mqtt_server_uri;

// init mqtt and make new connection
if (mqtt == NULL)
{
mqtt_server_uri = mqtt_get_server_uri ();
if (mqtt_server_uri)
mqtt = mqtt_init (mqtt_server_uri);
if (mqtt == NULL)
return -1;
mqtt = mqtt_connect (mqtt);
if (mqtt == NULL)
return -2;
}

client = mqtt->client;
if (!client)
return -1;
if (client == NULL)
{
return -3;
}

pubmsg.payload = (char *) msg;
pubmsg.payloadlen = (int) strlen (msg);
Expand All @@ -157,7 +322,7 @@ mqtt_publish (mqtt_t *mqtt, const char *topic, const char *msg)
{
g_warning ("Failed to connect: %s", MQTTClient_strerror (rc));
MQTTResponse_free (resp);
return -1;
return -4;
}

if ((rc = MQTTClient_waitForCompletion (client, token, TIMEOUT))
Expand All @@ -169,10 +334,50 @@ mqtt_publish (mqtt_t *mqtt, const char *topic, const char *msg)
}

return rc;
#else
(void) mqtt;
(void) topic;
(void) msg;
return -1;
#endif /* HAVE_MQTT */
}

/**
* @brief Send a single message.
*
* This functions creates a mqtt handle, connects, sends the message, closes
* the connection and destroys the handler.
* This function should not be chosen for repeated and frequent messaging. Its
* meant for Error messages and the likes emitted by openvas.
*
* @param topic Topic to publish to
* @param msg Message to publish
*
* @return 0 on success, <0 on failure.
*/
int
mqtt_publish_single_message (const char *topic, const char *msg)
{
const char *mqtt_server_uri;
mqtt_t *mqtt = NULL;

mqtt_server_uri = mqtt_get_server_uri ();
if (mqtt_server_uri)
mqtt = mqtt_init (mqtt_server_uri);
if (mqtt == NULL)
return -1;
mqtt = mqtt_connect (mqtt);
if (mqtt == NULL)
{
mqtt_reset (mqtt);
return -2;
}
if (mqtt_publish (mqtt, topic, msg) != 0)
{
mqtt_disconnect (mqtt);
mqtt_reset (mqtt);
return -3;
}
if (mqtt_disconnect (mqtt) != 0)
{
mqtt_reset (mqtt);
return -4;
}
mqtt_reset (mqtt);

return 0;
}
Loading

0 comments on commit 93392e4

Please sign in to comment.