Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

mqtt_connect fails with return -2 #23184

Closed
md-jamal opened this issue Mar 1, 2020 · 10 comments
Closed

mqtt_connect fails with return -2 #23184

md-jamal opened this issue Mar 1, 2020 · 10 comments
Assignees

Comments

@md-jamal
Copy link

md-jamal commented Mar 1, 2020

Hi Guys,

I tried to connect to MQTT Amazon Web Services IOT Service. Wrote a basic code by following other two cloud samples. Please have a look at the below code

/*
 * Copyright (c) 2019 Intel Corporation
 *
 * SPDX-License-Identifier: Apache-2.0
 */

#include <logging/log.h>
LOG_MODULE_REGISTER(aws_mqtt, LOG_LEVEL_DBG);

#include <zephyr.h>
#include <net/socket.h>
#include <net/mqtt.h>
#include <net/net_config.h>
#include <net/net_event.h>
#include <net/sntp.h>

#include <sys/printk.h>
#include <string.h>
#include <errno.h>
#include "dhcp.h"
#include <time.h>
#include <inttypes.h>
#include "globalsign.inc"
#if defined(CONFIG_SOCKS)
static struct sockaddr socks5_proxy;
#endif
static struct pollfd fds[1];
static int nfds;

#define CONFIG_SAMPLE_CLOUD_AWS_PASSWORD	"xxx"
#define CONFIG_SAMPLE_CLOUD_AWS_USERNAME	"xxxxxx@gmail.com"

#define APP_MQTT_BUFFER_SIZE    128
#define APP_CA_CERT_TAG 1
#define APP_PRIVATE_KEY_TAG 2
#define APP_SERVER_CERT_TAG 3

static sec_tag_t m_sec_tags[] = {

    APP_CA_CERT_TAG,
    APP_PRIVATE_KEY_TAG,
    APP_SERVER_CERT_TAG,
};


/* Buffers for MQTT client. */
static u8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
static u8_t tx_buffer[APP_MQTT_BUFFER_SIZE];
static bool connected;

#define MQTT_CLIENTID           "zephyr_publisher"

struct zsock_addrinfo *haddr;
s64_t time_base;
static struct sockaddr_storage broker;
static struct mqtt_client client_ctx;

static void clear_fds(void)
{
	nfds = 0;
}

static void broker_init(void)
{
	struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker;

	broker4->sin_family = AF_INET;
	broker4->sin_port = htons(8883);

	net_ipaddr_copy(&broker4->sin_addr,
			&net_sin(haddr->ai_addr)->sin_addr);

}

void mqtt_evt_handler(struct mqtt_client *const client,
		const struct mqtt_evt *evt)
{
	int err;

	switch (evt->type) {
		case MQTT_EVT_CONNACK:
			if (evt->result != 0) {
				LOG_ERR("MQTT connect failed %d", evt->result);
				break;
			}

			connected = true;
			LOG_INF("MQTT client connected!");

			break;

		case MQTT_EVT_DISCONNECT:
			LOG_INF("MQTT client disconnected %d", evt->result);

			connected = false;
			clear_fds();

			break;

		case MQTT_EVT_PUBACK:
			if (evt->result != 0) {
				LOG_ERR("MQTT PUBACK error %d", evt->result);
				break;
			}

			LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id);

			break;

		case MQTT_EVT_PUBREC:
			if (evt->result != 0) {
				LOG_ERR("MQTT PUBREC error %d", evt->result);
				break;
			}

			LOG_INF("PUBREC packet id: %u", evt->param.pubrec.message_id);

			const struct mqtt_pubrel_param rel_param = {
				.message_id = evt->param.pubrec.message_id
			};
			err = mqtt_publish_qos2_release(client, &rel_param);
			if (err != 0) {
				LOG_ERR("Failed to send MQTT PUBREL: %d", err);
			}

			break;

		case MQTT_EVT_PUBCOMP:
			if (evt->result != 0) {
				LOG_ERR("MQTT PUBCOMP error %d", evt->result);
				break;
			}

			LOG_INF("PUBCOMP packet id: %u",
					evt->param.pubcomp.message_id);

			break;

		default:
			break;
	}
}


static void client_init(struct mqtt_client *client)
{
        static struct mqtt_utf8 password;
        static struct mqtt_utf8 username;

	mqtt_client_init(client);
	broker_init();


	password.utf8 = (u8_t *)CONFIG_SAMPLE_CLOUD_AWS_PASSWORD;
	password.size = strlen(CONFIG_SAMPLE_CLOUD_AWS_PASSWORD);

	client->password = &password;

	username.utf8 = (u8_t *)CONFIG_SAMPLE_CLOUD_AWS_USERNAME;
	username.size = strlen(CONFIG_SAMPLE_CLOUD_AWS_USERNAME);

	client->user_name = &username;


	/* MQTT client configuration */
	client->broker = &broker;
	client->evt_cb = mqtt_evt_handler;
	client->client_id.utf8 = (u8_t *)MQTT_CLIENTID;
	client->client_id.size = strlen(MQTT_CLIENTID);
	client->protocol_version = MQTT_VERSION_3_1_1;

	/* MQTT buffers configuration */
	client->rx_buf = rx_buffer;
	client->rx_buf_size = sizeof(rx_buffer);
	client->tx_buf = tx_buffer;
	client->tx_buf_size = sizeof(tx_buffer);

	client->transport.type = MQTT_TRANSPORT_SECURE;

	struct mqtt_sec_config *tls_config = &client->transport.tls.config;

	tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
	tls_config->cipher_list = NULL;
	tls_config->sec_tag_list = m_sec_tags;
	tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags);
	tls_config->hostname = "agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com";
	client->transport.type = MQTT_TRANSPORT_SECURE;


}
#define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR")
#define PRINT_RESULT(func, rc) \
	LOG_INF("%s: %d <%s>", (func), rc, RC_STR(rc))
static void prepare_fds(struct mqtt_client *client)
{
	if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) {
		fds[0].fd = client->transport.tcp.sock;
	}
#if defined(CONFIG_MQTT_LIB_TLS)
	else if (client->transport.type == MQTT_TRANSPORT_SECURE) {
		fds[0].fd = client->transport.tls.sock;
	}
#endif

	fds[0].events = ZSOCK_POLLIN;
	nfds = 1;
}

static void wait(int timeout)
{
	if (nfds > 0) {
		if (poll(fds, nfds, timeout) < 0) {
			LOG_ERR("poll error: %d", errno);
		}
	}
}



static int try_to_connect(struct mqtt_client *client)
{
	u8_t retries = 3U;
	int rc;

	LOG_DBG("attempting to connect...");

	client_init(client);
	rc = mqtt_connect(client);
	if (rc != 0) {
		PRINT_RESULT("mqtt_connect", rc);
		k_sleep(500);
	}
	if (connected) {
		return 0;
	}

	return -EINVAL;

}


static int tls_init(void)
{
        int err = -EINVAL;
	LOG_INF("Ca size:%d\n", sizeof(amazon_certificate));
        err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE,
                                 amazon_certificate, sizeof(amazon_certificate));
        if (err < 0) {
                LOG_ERR("Failed to register public certificate: %d", err);
                return err;
        }

        err = tls_credential_add(APP_PRIVATE_KEY_TAG, TLS_CREDENTIAL_PRIVATE_KEY,
                                 private_key, sizeof(private_key));
        if (err < 0) {
                LOG_ERR("Failed to register private key: %d", err);
                return err;
        }

        err = tls_credential_add(APP_SERVER_CERT_TAG, TLS_CREDENTIAL_SERVER_CERTIFICATE,
                                 server_cert, sizeof(server_cert));
        if (err < 0) {
                LOG_ERR("Failed to register server certificate: %d", err);
                return err;
        }

	

        return err;
}


void mqtt_startup(char *hostname, int port)
{
	struct mqtt_client *client = &client_ctx;
	int retries = 5;
	int err, cnt;	
	static struct zsock_addrinfo hints;
	int res = 0;
	int rc;


	hints.ai_family = AF_INET;
	hints.ai_socktype = SOCK_STREAM;
	hints.ai_protocol = 0;
	cnt = 0;
	while ((err = getaddrinfo("agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com", "8883", &hints,
					&haddr)) && cnt < 3) {
		LOG_ERR("Unable to get address for broker, retrying");
		cnt++;
	}
	if (err != 0) {
		LOG_ERR("Unable to get address for broker, error %d",
				res);
		return;
	}
	LOG_INF("DNS resolved for agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com:8883");
	rc = try_to_connect(client);
	PRINT_RESULT("try_to_connect", rc);




}

static void show_addrinfo(struct addrinfo *addr)
{
	char hr_addr[NET_IPV6_ADDR_LEN];
	void *a;

top:
	LOG_DBG("  flags   : %d", addr->ai_flags);
	LOG_DBG("  family  : %d", addr->ai_family);
	LOG_DBG("  socktype: %d", addr->ai_socktype);
	LOG_DBG("  protocol: %d", addr->ai_protocol);
	LOG_DBG("  addrlen : %d", (int)addr->ai_addrlen);

	/* Assume two words. */
	LOG_DBG("   addr[0]: 0x%lx", ((uint32_t *)addr->ai_addr)[0]);
	LOG_DBG("   addr[1]: 0x%lx", ((uint32_t *)addr->ai_addr)[1]);

	if (addr->ai_next != 0) {
		addr = addr->ai_next;
		goto top;
	}

	a = &net_sin(addr->ai_addr)->sin_addr;
	LOG_INF("  Got %s",
			log_strdup(net_addr_ntop(addr->ai_family, a,
					hr_addr, sizeof(hr_addr))));

}




void main(void)
{
	static struct addrinfo hints;
	struct addrinfo *haddr;
	int res;
	int cnt = 0;
        int rc;


	app_dhcpv4_startup();

	LOG_INF("Should have DHCPv4 lease at this point.");

        rc = tls_init();
        PRINT_RESULT("tls_init", rc);
	mqtt_startup("agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com", 8883);	

}


It fails with the following messages

[00:00:00.010,000] <inf> eth_sam: Queue 0 activated
[00:00:00.016,000] <inf> eth_sam: Queue 1 set to idle
[00:00:00.023,000] <inf> eth_sam: Queue 2 set to idle
[00:00:00.030,000] <inf> eth_sam: Queue 3 set to idle
[00:00:00.037,000] <inf> eth_sam: Queue 4 set to idle
[00:00:00.043,000] <inf> eth_sam: Queue 5 set to idle
[00:00:00.050,000] <inf> eth_sam_phy: Soft Reset of ETH PHY
uart:[00:00:00.148,000] <inf> eth_sam_phy: PHYID: 0x221561 at addr: 0
[00:00:02.290,000] <inf> eth_sam_phy: common abilities: speed 100 Mb, full duplex
[00:00:02.300,000] <dbg> net_if.net_if_post_init: (0x20401b34):
[00:00:02.309,000] <dbg> net_if.net_if_up: (0x20401b34): iface 0x204230c0
uart:~$ *** Booting Zephyr OS build zephyr-v2.1.0-1382-gcf89ba33eae9  ***
[00:00:02.324,000] <inf> net_sock_tls: tls_init
[00:00:02.331,000] <inf> net_sock_tls: Debug level set
[00:00:02.339,000] <inf> net_config: Initializing network
[00:00:02.347,000] <inf> net_config: Running dhcpv4 client...
[00:00:05.355,000] <dbg> net_if.net_if_tx: (0x20401328): Processing (pkt 0x2041e47c, prio 0) network packet
[00:00:05.367,000] <inf> aws_mqtt: starting DHCPv4
[00:00:05.375,000] <dbg> net_if.net_if_tx: (0x20401328): Processing (pkt 0x2041e47c, prio 0) network packet
[00:00:05.388,000] <inf> net_dhcpv4: Received: 192.168.0.104
[00:00:05.396,000] <dbg> net_if.net_if_ipv4_addr_add: (0x204012a4): [0] interface 0x204230c0 address 192.168.0.104 type DHCP added
[00:00:05.410,000] <inf> net_config: IPv4 address: 192.168.0.104
[00:00:05.419,000] <inf> net_config: Lease time: 86400 seconds
[00:00:05.427,000] <inf> net_config: Subnet: 255.255.255.0
[00:00:05.435,000] <inf> net_config: Router: 192.168.0.1
[00:00:05.443,000] <inf> aws_mqtt: Should have DHCPv4 lease at this point.
[00:00:05.452,000] <inf> aws_mqtt: Ca size:1239

[00:00:05.460,000] <inf> aws_mqtt: tls_init: 0 <OK>
[00:00:05.467,000] <dbg> net_if.net_if_tx: (0x20401328): Processing (pkt 0x2041e47c, prio 0) network packet
[00:00:05.479,000] <dbg> net_if.net_if_tx: (0x20401328): Calling context send cb 0x20400fac status 42
[00:00:05.491,000] <dbg> net_if.net_if_tx: (0x20401328): Processing (pkt 0x2041e47c, prio 0) network packet
[00:00:05.503,000] <dbg> net_if.net_if_tx: (0x20401328): Calling context send cb 0x20400fac status 105
[00:00:05.552,000] <dbg> net_sock_addr.dns_resolve_cb: (0x204012a4): dns status: -100
[00:00:05.562,000] <dbg> net_sock_addr.dns_resolve_cb: (0x204012a4): dns status: -100
[00:00:05.572,000] <dbg> net_sock_addr.dns_resolve_cb: (0x204012a4): dns status: -100
[00:00:05.583,000] <dbg> net_sock_addr.dns_resolve_cb: (0x204012a4): getaddrinfo entries overflow
[00:00:05.594,000] <dbg> net_sock_addr.dns_resolve_cb: (0x204012a4): dns status: -103
[00:00:05.604,000] <inf> aws_mqtt: DNS resolved for agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com:8833
[00:00:05.616,000] <dbg> aws_mqtt.try_to_connect: attempting to connect...
[00:00:05.626,000] <dbg> net_sock_tls.tls_alloc: (0x20401b34): Allocated TLS context, 0x20400710
[00:00:05.637,000] <dbg> net_mqtt_sock_tls.mqtt_client_tls_connect: (0x20401b34): Created socket 0
[00:00:05.648,000] <dbg> net_mqtt_sock_tls.mqtt_client_tls_connect: (0x20401b34): Before Connect completed
[00:00:05.661,000] <dbg> net_tcp.net_tcp_change_state: (0x20401b34): [0x204014a8] state CLOSED (0) => SYN_SENT (2)
[00:00:05.674,000] <dbg> net_tcp.net_tcp_trace: (0x20401b34): [0x204014a8] pkt 0x2041e47c src 36403 dst 8883
[00:00:05.686,000] <dbg> net_tcp.net_tcp_trace: (0x20401b34):   seq 0xae875811 (2928105489) ack 0x0000 (0/0)
[00:00:05.698,000] <dbg> net_tcp.net_tcp_trace: (0x20401b34):   flags uaprSf
[00:00:05.708,000] <dbg> net_tcp.net_tcp_trace: (0x20401b34):   win 1280 chk 0x04c4
[00:00:05.718,000] <dbg> net_tcp.print_send_info: (0x20401b34): SYN sent to 54.68.223.142 port 8883
[00:00:05.730,000] <dbg> net_if.net_if_tx: (0x20401328): Processing (pkt 0x2041e47c, prio 0) network packet
[00:00:05.742,000] <dbg> net_if.net_if_tx: (0x20401328): Calling context send cb 0x20401024 status 58
[00:00:05.986,000] <dbg> net_tcp.net_tcp_change_state: (0x204012a4): [0x204014a8] state SYN_SENT (2) => ESTABLISHED (4)
[00:00:05.999,000] <dbg> net_tcp.net_tcp_trace: (0x204012a4): [0x204014a8] pkt 0x2041e47c src 36403 dst 8883
[00:00:06.011,000] <dbg> net_tcp.net_tcp_trace: (0x204012a4):   seq 0xae875812 (2928105490) ack 0xdda0a4c6 (3718292678/0)
[00:00:06.025,000] <dbg> net_tcp.net_tcp_trace: (0x204012a4):   flags uAprsf
[00:00:06.034,000] <dbg> net_tcp.net_tcp_trace: (0x204012a4):   win 1280 chk 0x9a09
[00:00:06.045,000] <dbg> net_tcp.print_send_info: (0x204012a4): ACK sent to 54.68.223.142 port 8883
[00:00:06.056,000] <dbg> net_if.net_if_tx: (0x20401328): Processing (pkt 0x2041e47c, prio 0) network packet
[00:00:06.068,000] <dbg> net_if.net_if_tx: (0x20401328): Calling context send cb 0x20401024 status 54
[00:00:06.080,000] <dbg> net_mqtt_sock_tls.mqtt_client_tls_connect: (0x20401b34): error 1
[00:00:06.091,000] <dbg> net_mqtt_sock_tls.mqtt_client_tls_connect: (0x20401b34): Error received
[00:00:06.102,000] <dbg> net_sock.z_impl_zsock_close: (0x20401b34): close: ctx=0x20401024, fd=0
[00:00:06.114,000] <dbg> net_tcp.net_tcp_change_state: (0x20401b34): [0x204014a8] state ESTABLISHED (4) => CLOSED (0)
[00:00:06.127,000] <dbg> net_tcp.net_tcp_release: (0x20401b34): [0x204014a8] Disposed of TCP connection state
[00:00:06.139,000] <inf> aws_mqtt: mqtt_connect: -2 <ERROR>
[00:00:06.647,000] <inf> aws_mqtt: try_to_connect: -22 <ERROR>

Following is my prj.conf file

CONFIG_NETWORKING=y

# Disable IPv6 support
CONFIG_NET_IPV6=n

# Enable IPv4 support
CONFIG_NET_IPV4=y
CONFIG_NET_IF_MAX_IPV4_COUNT=2
CONFIG_NET_IF_UNICAST_IPV4_ADDR_COUNT=3

# Enable TCP support
CONFIG_NET_TCP=y

# Enable Sockets support
CONFIG_NET_SOCKETS=y
CONFIG_NET_SOCKETS_SOCKOPT_TLS=y

# Enable DHCPv4 support
CONFIG_NET_DHCPV4=y

# Enable SOCKS5 proxy support
CONFIG_SOCKS=n

# Enable MQTT Lib support
CONFIG_MQTT_LIB=y
CONFIG_MQTT_LIB_TLS=y


# Network configuration
CONFIG_NET_CONFIG_SETTINGS=y

# Network connection manager
CONFIG_NET_CONNECTION_MANAGER=y

CONFIG_NET_MGMT_EVENT_STACK_SIZE=1024
CONFIG_NET_MGMT_EVENT_QUEUE_SIZE=5
CONFIG_NET_MGMT_EVENT_LOG_LEVEL_DBG=n

CONFIG_DNS_RESOLVER=y
CONFIG_DNS_SERVER_IP_ADDRESSES=y
CONFIG_DNS_SERVER1="8.8.8.8"
CONFIG_DNS_RESOLVER_ADDITIONAL_BUF_CTR=2
CONFIG_NEWLIB_LIBC=y

CONFIG_INIT_STACKS=y
CONFIG_NET_SHELL=y

CONFIG_MAIN_STACK_SIZE=4096

# Enable Logging support
CONFIG_LOG_IMMEDIATE=y
CONFIG_NET_LOG=y
CONFIG_NET_TCP_LOG_LEVEL_DBG=n
CONFIG_NET_SOCKETS_LOG_LEVEL_DBG=y
CONFIG_MQTT_LOG_LEVEL_DBG=n
CONFIG_NET_DHCPV4_LOG_LEVEL_INF=n
CONFIG_NET_IF_LOG_LEVEL_DBG=n
CONFIG_SOCKS_LOG_LEVEL_DBG=n
CONFIG_NET_CONFIG_LOG_LEVEL_DBG=y
CONFIG_NET_CONNECTION_MANAGER_LOG_LEVEL_DBG=n
CONFIG_DNS_RESOLVER_LOG_LEVEL_DBG=n
CONFIG_NET_CONTEXT_LOG_LEVEL_DBG=n
CONFIG_SOC_PART_NUMBER_SAME70Q21B=y
CONFIG_MQTT_LOG_LEVEL_DBG=y
CONFIG_SNTP=y


CONFIG_NET_SOCKETS_POSIX_NAMES=y

CONFIG_MAIN_STACK_SIZE=4096
CONFIG_MQTT_LIB_TLS=y
# Enable TLS credential filenames for secure socket offload
#CONFIG_NET_SOCKETS_OFFLOAD=y

# Enable Mbed TLS configuration
CONFIG_MBEDTLS=y
CONFIG_MBEDTLS_BUILTIN=y
CONFIG_MBEDTLS_ENABLE_HEAP=y
CONFIG_MBEDTLS_HEAP_SIZE=100000
CONFIG_MBEDTLS_SSL_MAX_CONTENT_LEN=10240
CONFIG_MBEDTLS_PEM_CERTIFICATE_FORMAT=y
CONFIG_MBEDTLS_DEBUG=y
CONFIG_MBEDTLS_DEBUG_LEVEL=4

@md-jamal md-jamal added the bug The issue is a bug, or the PR is fixing a bug label Mar 1, 2020
@md-jamal md-jamal changed the title mqtt_connect fails with net_sock_tls: TLS handshake error: -2700 mqtt_connect fails with return -2 Mar 1, 2020
@rlubos
Copy link
Contributor

rlubos commented Mar 3, 2020

Please remove the bug label unless you have a specific bug report.

The original title suggested that there is a problem with CA certificate used (0x2700 is MBEDTLS_ERR_X509_CERT_VERIFY_FAILED). Did anything change in that matter which caused the title change?

I recommend to enable mbed TLS logs in order to proceed with investigation. Try adding the following:

CONFIG_MBEDTLS_DEBUG=y
CONFIG_MBEDTLS_DEBUG_LEVEL=4
CONFIG_NET_SOCKETS_LOG_LEVEL_DBG=y

@carlescufi carlescufi added question and removed bug The issue is a bug, or the PR is fixing a bug labels Mar 3, 2020
@md-jamal
Copy link
Author

md-jamal commented Mar 3, 2020

How to enable 'MBEDTLS_X509_CRT_PARSE_C' this particular option
From subsys/net/lib/sockets/sockets_tls.c, line number 331

#if defined(MBEDTLS_X509_CRT_PARSE_C)
		mbedtls_x509_crt_init(&tls->ca_chain);
		mbedtls_x509_crt_init(&tls->own_cert);
		mbedtls_pk_init(&tls->priv_key);
#endif

@rlubos
Copy link
Contributor

rlubos commented Mar 3, 2020

It's enabled by default when certificate-based cipher suite is selected:
https://github.com/zephyrproject-rtos/mbedtls/blob/master/configs/config-tls-generic.h#L336

@md-jamal
Copy link
Author

md-jamal commented Mar 4, 2020

I have enabled CONFIG_MBEDTLS_KEY_EXCHANGE_RSA_PSK_ENABLED=y in prj.conf

Then i have added the following statement in src/main.c

	app_dhcpv4_startup();
#if defined(MBEDTLS_X509_CRT_PARSE_C)
	LOG_INF("Should have DHCPv4 lease at this point.");
#endif

After flashing the code on to the device, and looking at serial logs, this log is not present, hence my question

@md-jamal
Copy link
Author

md-jamal commented Mar 5, 2020

Got the zephyr able to communicate with aws, if anyone is interested

#include <logging/log.h>
LOG_MODULE_REGISTER(aws_mqtt, LOG_LEVEL_DBG);

#include <zephyr.h>
#include <net/socket.h>
#include <net/mqtt.h>
#include <net/net_config.h>
#include <net/net_event.h>
#include <net/sntp.h>

#include <sys/printk.h>
#include <string.h>
#include <errno.h>
#include "dhcp.h"
#include <time.h>
#include <inttypes.h>
#include "globalsign.inc"
#if defined(CONFIG_SOCKS)
static struct sockaddr socks5_proxy;
#endif
static struct pollfd fds[1];
static int nfds;

#define ALIVE_TIME      (MSEC_PER_SEC * 60U)
#define APP_MQTT_BUFFER_SIZE    128
#define APP_CA_CERT_TAG 1
#define APP_PRIVATE_SERVER_KEY_TAG 2

static sec_tag_t m_sec_tags[] = {

    APP_CA_CERT_TAG,
    APP_PRIVATE_SERVER_KEY_TAG,
};


/* Buffers for MQTT client. */
static u8_t rx_buffer[APP_MQTT_BUFFER_SIZE];
static u8_t tx_buffer[APP_MQTT_BUFFER_SIZE];
static bool connected;
static u64_t next_alive;
#define MQTT_CLIENTID           "zephyr_publisher"

struct zsock_addrinfo *haddr;
s64_t time_base;
static struct sockaddr_storage broker;
static struct mqtt_client client_ctx;

static void clear_fds(void)
{
	nfds = 0;
}

static void broker_init(void)
{
	struct sockaddr_in *broker4 = (struct sockaddr_in *)&broker;

	broker4->sin_family = AF_INET;
	broker4->sin_port = htons(8883);

	net_ipaddr_copy(&broker4->sin_addr,
			&net_sin(haddr->ai_addr)->sin_addr);

}

void mqtt_evt_handler(struct mqtt_client *const client,
		const struct mqtt_evt *evt)
{
	int err;

	switch (evt->type) {
		case MQTT_EVT_CONNACK:
			if (evt->result != 0) {
				LOG_ERR("MQTT connect failed %d", evt->result);
				break;
			}

			connected = true;
			LOG_INF("MQTT client connected!");

			break;

		case MQTT_EVT_DISCONNECT:
			LOG_INF("MQTT client disconnected %d", evt->result);

			connected = false;
			clear_fds();

			break;

		case MQTT_EVT_PUBACK:
			if (evt->result != 0) {
				LOG_ERR("MQTT PUBACK error %d", evt->result);
				break;
			}

			LOG_INF("PUBACK packet id: %u", evt->param.puback.message_id);

			break;

		case MQTT_EVT_PUBREC:
			if (evt->result != 0) {
				LOG_ERR("MQTT PUBREC error %d", evt->result);
				break;
			}

			LOG_INF("PUBREC packet id: %u", evt->param.pubrec.message_id);

			const struct mqtt_pubrel_param rel_param = {
				.message_id = evt->param.pubrec.message_id
			};
			err = mqtt_publish_qos2_release(client, &rel_param);
			if (err != 0) {
				LOG_ERR("Failed to send MQTT PUBREL: %d", err);
			}

			break;

		case MQTT_EVT_PUBCOMP:
			if (evt->result != 0) {
				LOG_ERR("MQTT PUBCOMP error %d", evt->result);
				break;
			}

			LOG_INF("PUBCOMP packet id: %u",
					evt->param.pubcomp.message_id);

			break;

		default:
			break;
	}
}


static void client_init(struct mqtt_client *client)
{
        static struct mqtt_utf8 password;
        static struct mqtt_utf8 username;

	mqtt_client_init(client);
	broker_init();


	client->password = NULL;
	client->user_name = NULL;

	/* MQTT client configuration */
	client->broker = &broker;
	client->evt_cb = mqtt_evt_handler;
	client->client_id.utf8 = (u8_t *)MQTT_CLIENTID;
	client->client_id.size = strlen(MQTT_CLIENTID);
	client->protocol_version = MQTT_VERSION_3_1_1;

	/* MQTT buffers configuration */
	client->rx_buf = rx_buffer;
	client->rx_buf_size = sizeof(rx_buffer);
	client->tx_buf = tx_buffer;
	client->tx_buf_size = sizeof(tx_buffer);

	client->transport.type = MQTT_TRANSPORT_SECURE;

	struct mqtt_sec_config *tls_config = &client->transport.tls.config;

	tls_config->peer_verify = TLS_PEER_VERIFY_REQUIRED;
	tls_config->cipher_list = NULL;
	tls_config->sec_tag_list = m_sec_tags;
	//tls_config->sec_tag_count = 1;
	tls_config->sec_tag_count = ARRAY_SIZE(m_sec_tags);
	tls_config->hostname = CONFIG_AWS_HOSTNAME;
	client->transport.type = MQTT_TRANSPORT_SECURE;


}
#define RC_STR(rc) ((rc) == 0 ? "OK" : "ERROR")
#define PRINT_RESULT(func, rc) \
	LOG_INF("%s: %d <%s>", (func), rc, RC_STR(rc))
static void prepare_fds(struct mqtt_client *client)
{
	if (client->transport.type == MQTT_TRANSPORT_NON_SECURE) {
		fds[0].fd = client->transport.tcp.sock;
	}
#if defined(CONFIG_MQTT_LIB_TLS)
	else if (client->transport.type == MQTT_TRANSPORT_SECURE) {
		fds[0].fd = client->transport.tls.sock;
	}
#endif

	fds[0].events = ZSOCK_POLLIN;
	nfds = 1;
}

static void wait(int timeout)
{
	if (nfds > 0) {
		if (poll(fds, nfds, timeout) < 0) {
			LOG_ERR("poll error: %d", errno);
		}
	}
}



static int try_to_connect(struct mqtt_client *client)
{
	u8_t retries = 3U;
	int rc;

	LOG_DBG("attempting to connect...");

	client_init(client);
	rc = mqtt_connect(client);
	if (rc != 0) {
		PRINT_RESULT("mqtt_connect", rc);
		k_sleep(500);
	}else {
		prepare_fds(client);

		wait(500);
		mqtt_input(client);
		if (!connected) {
			LOG_INF("Aborting connection\n");
			mqtt_abort(client);
		}

	}
	if (connected) {
		return 0;
	}

	return -EINVAL;

}

static int wait_for_input(int timeout)
{
        int res;
        struct zsock_pollfd fds[1] = {
                [0] = {.fd = client_ctx.transport.tls.sock,
                      .events = ZSOCK_POLLIN,
                      .revents = 0},
        };

        res = zsock_poll(fds, 1, timeout);
        if (res < 0) {
                LOG_ERR("poll read event error");
                return -errno;
        }

        return res;
}


static int publish(struct mqtt_client *client, enum mqtt_qos qos)
{
        char payload[] = "{id=123}";
        char topic[] = "/things/";
        u8_t len = strlen(topic);
        struct mqtt_publish_param param;
	char pub_msg[64];
	int err;

        param.message.topic.qos = qos;
        param.message.topic.topic.utf8 = (u8_t *)topic;
        param.message.topic.topic.size = len;
        param.message.payload.data = pub_msg;
        param.message.payload.len = strlen(payload);
        param.message_id = sys_rand32_get();
        param.dup_flag = 0U;
        param.retain_flag = 0U;


	mqtt_live(client);
	next_alive = k_uptime_get() + ALIVE_TIME;

        while (1) {
                LOG_INF("Publishing data");
                sprintf(pub_msg, "%s: %d\n", "payload", param.message_id);
                param.message.payload.len = strlen(pub_msg);
                err = mqtt_publish(client, &param);
                if (err) {
                        LOG_ERR("could not publish, error %d", err);
                        break;
                }

                /* idle and process messages */
                while (k_uptime_get() < next_alive) {
                        LOG_INF("... idling ...");
                        if (wait_for_input(5000) > 0) {
                                mqtt_input(client);
                        }
                }

                mqtt_live(client);
                next_alive += ALIVE_TIME;
        }

}


static int tls_init(void)
{
        int err = -EINVAL;
	LOG_INF("Ca size:%d\n", sizeof(amazon_certificate));
        err = tls_credential_add(APP_CA_CERT_TAG, TLS_CREDENTIAL_CA_CERTIFICATE,
                                 amazon_certificate, sizeof(amazon_certificate));
        if (err < 0) {
                LOG_ERR("Failed to register public certificate: %d", err);
                return err;
        }

        err = tls_credential_add(APP_PRIVATE_SERVER_KEY_TAG, TLS_CREDENTIAL_PRIVATE_KEY,
                                 private_key, sizeof(private_key));
        if (err < 0) {
                LOG_ERR("Failed to register private key: %d", err);
                return err;
        }

        err = tls_credential_add(APP_PRIVATE_SERVER_KEY_TAG, TLS_CREDENTIAL_SERVER_CERTIFICATE,
                                 server_cert, sizeof(server_cert));
        if (err < 0) {
                LOG_ERR("Failed to register server certificate: %d", err);
                return err;
        }

	

        return err;
}


void mqtt_startup(char *hostname, int port)
{
	struct mqtt_client *client = &client_ctx;
	int retries = 5;
	int err, cnt;	
	static struct zsock_addrinfo hints;
	int res = 0;
	int rc;


	hints.ai_family = AF_INET;
	hints.ai_socktype = SOCK_STREAM;
	hints.ai_protocol = 0;
	cnt = 0;
	while ((err = getaddrinfo(CONFIG_AWS_HOSTNAME, "8883", &hints,
					&haddr)) && cnt < 3) {
		LOG_ERR("Unable to get address for broker, retrying");
		cnt++;
	}
	if (err != 0) {
		LOG_ERR("Unable to get address for broker, error %d",
				res);
		return;
	}
	LOG_INF("DNS resolved for agwgaprlb6xdl-ats.iot.us-west-2.amazonaws.com:8883");
	rc = try_to_connect(client);
	PRINT_RESULT("try_to_connect", rc);
	if (rc == 0) {
		publish(&client_ctx, MQTT_QOS_1_AT_LEAST_ONCE);
	}



}

static void show_addrinfo(struct addrinfo *addr)
{
	char hr_addr[NET_IPV6_ADDR_LEN];
	void *a;

top:
	LOG_DBG("  flags   : %d", addr->ai_flags);
	LOG_DBG("  family  : %d", addr->ai_family);
	LOG_DBG("  socktype: %d", addr->ai_socktype);
	LOG_DBG("  protocol: %d", addr->ai_protocol);
	LOG_DBG("  addrlen : %d", (int)addr->ai_addrlen);

	/* Assume two words. */
	LOG_DBG("   addr[0]: 0x%lx", ((uint32_t *)addr->ai_addr)[0]);
	LOG_DBG("   addr[1]: 0x%lx", ((uint32_t *)addr->ai_addr)[1]);

	if (addr->ai_next != 0) {
		addr = addr->ai_next;
		goto top;
	}

	a = &net_sin(addr->ai_addr)->sin_addr;
	LOG_INF("  Got %s",
			log_strdup(net_addr_ntop(addr->ai_family, a,
					hr_addr, sizeof(hr_addr))));

}




void main(void)
{
	static struct addrinfo hints;
	struct addrinfo *haddr;
	int res;
	int cnt = 0;
        int rc;


	app_dhcpv4_startup();
        rc = tls_init();
        PRINT_RESULT("tls_init", rc);
	mqtt_startup(CONFIG_AWS_HOSTNAME, 8883);	

}

@jukkar
Copy link
Member

jukkar commented Mar 11, 2020

Got the zephyr able to communicate with aws, if anyone is interested

Thanks for the example! Would you be able to submit a sample app to samples/net/clould/ for this so that it would be available also as a part of zephyr?

@md-jamal
Copy link
Author

Do we have any documentation on how to submit?

@jukkar
Copy link
Member

jukkar commented Mar 13, 2020

@md-jamal
Copy link
Author

I have created a pull request : #23469
I am doing this first time, please provide feedback and i will correct them

@rlubos
Copy link
Contributor

rlubos commented May 5, 2020

The original issue was resolved.

@rlubos rlubos closed this as completed May 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants