From cfde9ff71e7fb9e12a692e569c764a4a7cfc5d4c Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Tue, 7 Jul 2015 20:02:05 +0900 Subject: [PATCH 01/10] in_serial: read() usage fix Signed-off-by: Takeshi HASEGAWA --- plugins/in_serial/in_serial.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/in_serial/in_serial.c b/plugins/in_serial/in_serial.c index df54cbdf8bf..a8fd7c46ade 100644 --- a/plugins/in_serial/in_serial.c +++ b/plugins/in_serial/in_serial.c @@ -121,7 +121,7 @@ int in_serial_collect(struct flb_config *config, void *in_context) char line[2024]; struct flb_in_serial_config *ctx = in_context; - bytes = read(ctx->fd, line, sizeof(line) - 1); + bytes = read(ctx->fd, &line, sizeof(line) - 1); if (bytes == -1) { if (errno == -EPIPE) { return -1; From 81724b852d3f66677f6fdd2aacce571aef04981f Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Sun, 19 Jul 2015 18:05:04 +0900 Subject: [PATCH 02/10] in_serial: No longer validates bitrate parameter termios.h in some operating systems actually don't have Bxxxxxx symbols for higher bitrates. This change allows use of those. Eventually we'd like to re-introduce better input validation mechanism. Signed-off-by: Takeshi HASEGAWA --- plugins/in_serial/in_serial.c | 34 +--------------------------------- 1 file changed, 1 insertion(+), 33 deletions(-) diff --git a/plugins/in_serial/in_serial.c b/plugins/in_serial/in_serial.c index a8fd7c46ade..639ef0acb62 100644 --- a/plugins/in_serial/in_serial.c +++ b/plugins/in_serial/in_serial.c @@ -181,39 +181,7 @@ int in_serial_init(struct flb_config *config) tcgetattr(fd, &ctx->tio_orig); memset(&ctx->tio, 0, sizeof(ctx->tio)); - switch (atoi(ctx->bitrate)) { - case 1200: - ctx->tio.c_cflag = B1200; - break; - case 2400: - ctx->tio.c_cflag = B2400; - break; - case 4800: - ctx->tio.c_cflag = B4800; - break; - case 9600: - ctx->tio.c_cflag = B9600; - break; - case 19200: - ctx->tio.c_cflag = B19200; - break; - case 38400: - ctx->tio.c_cflag = B38400; - break; - -#ifdef __LINUX__ - case 576000: - ctx->tio.c_cflag = B576000; - break; - case 115200: - ctx->tio.c_cflag = B115200; - break; -#endif - - default: - flb_utils_error_c("Invalid bitrate for serial plugin"); - } - + ctx->tio.c_cflag = atoi(ctx->bitrate); ctx->tio.c_cflag |= CRTSCTS | CS8 | CLOCAL | CREAD; ctx->tio.c_iflag = IGNPAR | IGNCR; ctx->tio.c_oflag = 0; From a8b744e752cd698bd7030d19b982024c51f357c7 Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Sun, 19 Jul 2015 18:07:35 +0900 Subject: [PATCH 03/10] in_serial: Don't save original termios Most of operating systems, expecting Linux, will restore termios if the process is terminated. The line was intended for keeping original termios before process shutdown, but we don't have a chance (callback) to recover it yet, therefore it is a useless. Signed-off-by: Takeshi HASEGAWA --- plugins/in_serial/in_serial.c | 1 - 1 file changed, 1 deletion(-) diff --git a/plugins/in_serial/in_serial.c b/plugins/in_serial/in_serial.c index 639ef0acb62..f793b28fe0f 100644 --- a/plugins/in_serial/in_serial.c +++ b/plugins/in_serial/in_serial.c @@ -179,7 +179,6 @@ int in_serial_init(struct flb_config *config) msgpack_sbuffer_init(&ctx->mp_sbuf); msgpack_packer_init(&ctx->mp_pck, &ctx->mp_sbuf, msgpack_sbuffer_write); - tcgetattr(fd, &ctx->tio_orig); memset(&ctx->tio, 0, sizeof(ctx->tio)); ctx->tio.c_cflag = atoi(ctx->bitrate); ctx->tio.c_cflag |= CRTSCTS | CS8 | CLOCAL | CREAD; From 50dfd6618cb7c0a04e7774b8097a80fcfbe5323e Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Thu, 2 Jul 2015 23:36:28 +0900 Subject: [PATCH 04/10] in_xbee: Fixed deadlock in caused by in_xbee_flush() Signed-off-by: Takeshi HASEGAWA --- plugins/in_xbee/in_xbee.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugins/in_xbee/in_xbee.c b/plugins/in_xbee/in_xbee.c index 7c9e587299c..a46f45ea49c 100644 --- a/plugins/in_xbee/in_xbee.c +++ b/plugins/in_xbee/in_xbee.c @@ -221,7 +221,7 @@ void *in_xbee_flush(void *in_context, int *size) return buf; fail: - pthread_mutex_lock(&ctx->mtx_mp); + pthread_mutex_unlock(&ctx->mtx_mp); return NULL; } From fe983f2d9ffc4e68e1124482d0eb681baff3b3b0 Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Thu, 2 Jul 2015 23:39:53 +0900 Subject: [PATCH 05/10] in_xbee: removed in_xbee_collect() As of today, in_xbee_collect() is not used. Signed-off-by: Takeshi HASEGAWWA --- plugins/in_xbee/in_xbee.c | 33 --------------------------------- 1 file changed, 33 deletions(-) diff --git a/plugins/in_xbee/in_xbee.c b/plugins/in_xbee/in_xbee.c index a46f45ea49c..98510d66442 100644 --- a/plugins/in_xbee/in_xbee.c +++ b/plugins/in_xbee/in_xbee.c @@ -175,23 +175,6 @@ void in_xbee_cb(struct xbee *xbee, struct xbee_con *con, } } -/* Callback triggered by timer */ -int in_xbee_collect(struct flb_config *config, void *in_context) -{ - int ret = 0; - void *p = NULL; - (void) config; - struct flb_in_xbee_config *ctx = in_context; - - if ((ret = xbee_conCallbackGet(ctx->con, - (xbee_t_conCallback*) &p)) != XBEE_ENONE) { - flb_debug("xbee_conCallbackGet() returned: %d", ret); - return ret; - } - - return 0; -} - void *in_xbee_flush(void *in_context, int *size) { char *buf; @@ -332,21 +315,6 @@ int in_xbee_init(struct flb_config *config) flb_utils_error_c("Could not set configuration for xbee input plugin"); } - /* - * Set our collector based on time. We will trigger a collection at certain - * intervals. For now it works but it's not the ideal implementation. I am - * talking with libxbee maintainer to check possible workarounds and use - * proper events mechanism. - */ - ret = flb_input_set_collector_time("xbee", - in_xbee_collect, - IN_XBEE_COLLECT_SEC, - IN_XBEE_COLLECT_NSEC, - config); - if (ret == -1) { - flb_utils_error_c("Could not set collector for xbee input plugin"); - } - return 0; } @@ -356,6 +324,5 @@ struct flb_input_plugin in_xbee_plugin = { .description = "XBee Device", .cb_init = in_xbee_init, .cb_pre_run = NULL, - .cb_collect = in_xbee_collect, .cb_flush_buf = in_xbee_flush, }; From 3bb1b1a69fd067ed276a35866e817e27dfca387c Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Thu, 2 Jul 2015 23:31:19 +0900 Subject: [PATCH 06/10] in_xbee: Periodic I/O sampling support Signed-off-by: Takeshi HASEGAWA --- plugins/in_xbee/in_xbee.c | 134 ++++++++++++++++++++++++++++--- plugins/in_xbee/in_xbee_config.h | 3 +- 2 files changed, 126 insertions(+), 11 deletions(-) diff --git a/plugins/in_xbee/in_xbee.c b/plugins/in_xbee/in_xbee.c index 98510d66442..f4c65c49133 100644 --- a/plugins/in_xbee/in_xbee.c +++ b/plugins/in_xbee/in_xbee.c @@ -170,9 +170,104 @@ void in_xbee_cb(struct xbee *xbee, struct xbee_con *con, printf("\n"); #endif - if (! in_xbee_rx_queue_msgpack(ctx, (const char*) (*pkt)->data, (*pkt)->dataLen)) { - in_xbee_rx_queue_raw(ctx, (const char*) (*pkt)->data, (*pkt)->dataLen); +} + +void in_xbee_io_sample_cb(struct xbee *xbee, struct xbee_con *con, + struct xbee_pkt **pkt, void **data) +{ + struct flb_in_xbee_config *ctx; + int i; + int map_len = 0; + unsigned int mask_din, mask_ain; + + + if ((*pkt)->dataLen == 0) { + flb_debug("xbee data length too short, skip"); + return; + } + + ctx = *data; + + unsigned char *p = (unsigned char*) (*pkt)->data; + + if (*p != 1) + return; + + mask_din = *(p + 1) << 8 | *(p + 2); + mask_ain = *(p + 3); + + for (i = 0; i < 15; i++) { + if (mask_din & (1 << i)) + map_len++; + if (mask_ain & (1 << i)) + map_len++; } + + p += 4; + + flb_debug("[xbee] IO sample: mask_din=0x%x mask_ain=%x map_len=%d", mask_din, mask_ain, map_len); + + pthread_mutex_lock(&ctx->mtx_mp); + + in_xbee_flush_if_needed(ctx); + ctx->buffer_id++; + + msgpack_pack_array(&ctx->mp_pck, 2); + msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); + msgpack_pack_map(&ctx->mp_pck, map_len); + + if (mask_din) { + /* sampled digital data sets */ + int din = *p << 8 | *(p + 1); + p += 2; + + for (i = 0; i < 15; i++) { + if (mask_din & (1 << i)) { + char name[6]; + snprintf((char*) &name, sizeof(name), "DIO%d", i); + + msgpack_pack_bin(&ctx->mp_pck, strlen((char*) &name)); + msgpack_pack_bin_body(&ctx->mp_pck, (char*) &name, strlen((char*) &name)); + msgpack_pack_int(&ctx->mp_pck, (din & (1 << i)) > 0); + } + } + } + + if (mask_ain & 0x01) { + msgpack_pack_bin(&ctx->mp_pck, 3); + msgpack_pack_bin_body(&ctx->mp_pck, "AD0", 3); + msgpack_pack_int(&ctx->mp_pck, *p << 8 | *(p + 1)); + p += 2; + } + + if (mask_ain & 0x02) { + msgpack_pack_bin(&ctx->mp_pck, 3); + msgpack_pack_bin_body(&ctx->mp_pck, "AD1", 3); + msgpack_pack_int(&ctx->mp_pck, *p << 8 | *(p + 1)); + p += 2; + } + + if (mask_ain & 0x04) { + msgpack_pack_bin(&ctx->mp_pck, 3); + msgpack_pack_bin_body(&ctx->mp_pck, "AD2", 3); + msgpack_pack_int(&ctx->mp_pck, *p << 8 | *(p + 1)); + p += 2; + } + + if (mask_ain & 0x08) { + msgpack_pack_bin(&ctx->mp_pck, 3); + msgpack_pack_bin_body(&ctx->mp_pck, "AD3", 3); + msgpack_pack_int(&ctx->mp_pck, *p << 8 | *(p + 1)); + p += 2; + } + + if (mask_ain & 0x80) { + msgpack_pack_bin(&ctx->mp_pck, 3); + msgpack_pack_bin_body(&ctx->mp_pck, "VCC", 3); + msgpack_pack_int(&ctx->mp_pck, *p << 8 | *(p + 1)); + p += 2; + } + pthread_mutex_unlock(&ctx->mtx_mp); } void *in_xbee_flush(void *in_context, int *size) @@ -214,7 +309,6 @@ int in_xbee_init(struct flb_config *config) int ret; struct stat dev_st; struct xbee *xbee; - struct xbee_con *con; struct xbee_conAddress address; struct flb_in_xbee_config *ctx; struct xbee_conSettings settings; @@ -260,6 +354,7 @@ int in_xbee_init(struct flb_config *config) ctx->config = config; pthread_mutex_init(&ctx->mtx_mp, NULL); + ctx->buffer_len = 0; /* Init library */ xbee_init(); @@ -286,25 +381,44 @@ int in_xbee_init(struct flb_config *config) xbee_logLevelSet(xbee, ctx->xbeeLogLevel); /* Prepare a connection with the peer XBee */ - if ((ret = xbee_conNew(xbee, &con, "Data", &address)) != XBEE_ENONE) { + + if ((ret = xbee_conNew(xbee, &ctx->con_data, "Data", &address)) != XBEE_ENONE) { xbee_log(xbee, -1, "xbee_conNew() returned: %d (%s)", ret, xbee_errorToStr(ret)); return ret; } - xbee_conSettings(con, NULL, &settings); + xbee_conSettings(ctx->con_data, NULL, &settings); settings.disableAck = ctx->xbeeDisableAck ? 1 : 0; settings.catchAll = ctx->xbeeCatchAll ? 1 : 0; - xbee_conSettings(con, &settings, NULL); + xbee_conSettings(ctx->con_data, &settings, NULL); - ctx->con = con; - ctx->buffer_len = 0; + if ((ret = xbee_conDataSet(ctx->con_data, ctx, NULL)) != XBEE_ENONE) { + xbee_log(xbee, -1, "xbee_conDataSet() returned: %d", ret); + return ret; + } + + if ((ret = xbee_conCallbackSet(ctx->con_data, in_xbee_cb, NULL)) != XBEE_ENONE) { + xbee_log(xbee, -1, "xbee_conCallbackSet() returned: %d", ret); + return ret; + } + + + if ((ret = xbee_conNew(xbee, &ctx->con_io, "I/O", &address)) != XBEE_ENONE) { + xbee_log(xbee, -1, "xbee_conNew() returned: %d (%s)", ret, xbee_errorToStr(ret)); + return ret; + } + + xbee_conSettings(ctx->con_io, NULL, &settings); + settings.disableAck = ctx->xbeeDisableAck ? 1 : 0; + settings.catchAll = ctx->xbeeCatchAll ? 1 : 0; + xbee_conSettings(ctx->con_io, &settings, NULL); - if ((ret = xbee_conDataSet(con, ctx, NULL)) != XBEE_ENONE) { + if ((ret = xbee_conDataSet(ctx->con_io, ctx, NULL)) != XBEE_ENONE) { xbee_log(xbee, -1, "xbee_conDataSet() returned: %d", ret); return ret; } - if ((ret = xbee_conCallbackSet(con, in_xbee_cb, NULL)) != XBEE_ENONE) { + if ((ret = xbee_conCallbackSet(ctx->con_io, in_xbee_io_sample_cb, NULL)) != XBEE_ENONE) { xbee_log(xbee, -1, "xbee_conCallbackSet() returned: %d", ret); return ret; } diff --git a/plugins/in_xbee/in_xbee_config.h b/plugins/in_xbee/in_xbee_config.h index e207eefe142..1930a8fbdea 100644 --- a/plugins/in_xbee/in_xbee_config.h +++ b/plugins/in_xbee/in_xbee_config.h @@ -43,7 +43,8 @@ struct flb_in_xbee_config { char *xbeeMode; /* Active connection context */ - struct xbee_con *con; + struct xbee_con *con_data; + struct xbee_con *con_io; /* buffering */ int buffer_len; From 5e1def1793ab15016c6425722bf0f6ddf2cf31ad Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Mon, 20 Jul 2015 02:55:55 +0900 Subject: [PATCH 07/10] in_xbee: Refactored IO sampling implementaion - Splited IO sampling subroutines to in_xbee_iosampling.[ch] - Introduced available ioports lists to implement XBee API more accurately Signed-off-by: Takeshi HASEGAWA --- plugins/in_xbee/CMakeLists.txt | 2 +- plugins/in_xbee/in_xbee.c | 101 +--------------- plugins/in_xbee/in_xbee.h | 1 + plugins/in_xbee/in_xbee_iosampling.c | 166 +++++++++++++++++++++++++++ plugins/in_xbee/in_xbee_iosampling.h | 26 +++++ 5 files changed, 196 insertions(+), 100 deletions(-) create mode 100644 plugins/in_xbee/in_xbee_iosampling.c create mode 100644 plugins/in_xbee/in_xbee_iosampling.h diff --git a/plugins/in_xbee/CMakeLists.txt b/plugins/in_xbee/CMakeLists.txt index 5db75738f85..21ba144c2ae 100644 --- a/plugins/in_xbee/CMakeLists.txt +++ b/plugins/in_xbee/CMakeLists.txt @@ -3,6 +3,6 @@ add_subdirectory(lib/libxbee-v3) include_directories(lib/libxbee-v3) set(src - in_xbee.c in_xbee_config.c) + in_xbee.c in_xbee_config.c in_xbee_iosampling.c) FLB_PLUGIN(in_xbee "${src}" "xbee") diff --git a/plugins/in_xbee/in_xbee.c b/plugins/in_xbee/in_xbee.c index f4c65c49133..d7e69bb19e8 100644 --- a/plugins/in_xbee/in_xbee.c +++ b/plugins/in_xbee/in_xbee.c @@ -32,6 +32,7 @@ #include #include "in_xbee.h" +#include "in_xbee_iosampling.h" #include "in_xbee_config.h" /* @@ -172,104 +173,6 @@ void in_xbee_cb(struct xbee *xbee, struct xbee_con *con, } -void in_xbee_io_sample_cb(struct xbee *xbee, struct xbee_con *con, - struct xbee_pkt **pkt, void **data) -{ - struct flb_in_xbee_config *ctx; - int i; - int map_len = 0; - unsigned int mask_din, mask_ain; - - - if ((*pkt)->dataLen == 0) { - flb_debug("xbee data length too short, skip"); - return; - } - - ctx = *data; - - unsigned char *p = (unsigned char*) (*pkt)->data; - - if (*p != 1) - return; - - mask_din = *(p + 1) << 8 | *(p + 2); - mask_ain = *(p + 3); - - for (i = 0; i < 15; i++) { - if (mask_din & (1 << i)) - map_len++; - if (mask_ain & (1 << i)) - map_len++; - } - - p += 4; - - flb_debug("[xbee] IO sample: mask_din=0x%x mask_ain=%x map_len=%d", mask_din, mask_ain, map_len); - - pthread_mutex_lock(&ctx->mtx_mp); - - in_xbee_flush_if_needed(ctx); - ctx->buffer_id++; - - msgpack_pack_array(&ctx->mp_pck, 2); - msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); - msgpack_pack_map(&ctx->mp_pck, map_len); - - if (mask_din) { - /* sampled digital data sets */ - int din = *p << 8 | *(p + 1); - p += 2; - - for (i = 0; i < 15; i++) { - if (mask_din & (1 << i)) { - char name[6]; - snprintf((char*) &name, sizeof(name), "DIO%d", i); - - msgpack_pack_bin(&ctx->mp_pck, strlen((char*) &name)); - msgpack_pack_bin_body(&ctx->mp_pck, (char*) &name, strlen((char*) &name)); - msgpack_pack_int(&ctx->mp_pck, (din & (1 << i)) > 0); - } - } - } - - if (mask_ain & 0x01) { - msgpack_pack_bin(&ctx->mp_pck, 3); - msgpack_pack_bin_body(&ctx->mp_pck, "AD0", 3); - msgpack_pack_int(&ctx->mp_pck, *p << 8 | *(p + 1)); - p += 2; - } - - if (mask_ain & 0x02) { - msgpack_pack_bin(&ctx->mp_pck, 3); - msgpack_pack_bin_body(&ctx->mp_pck, "AD1", 3); - msgpack_pack_int(&ctx->mp_pck, *p << 8 | *(p + 1)); - p += 2; - } - - if (mask_ain & 0x04) { - msgpack_pack_bin(&ctx->mp_pck, 3); - msgpack_pack_bin_body(&ctx->mp_pck, "AD2", 3); - msgpack_pack_int(&ctx->mp_pck, *p << 8 | *(p + 1)); - p += 2; - } - - if (mask_ain & 0x08) { - msgpack_pack_bin(&ctx->mp_pck, 3); - msgpack_pack_bin_body(&ctx->mp_pck, "AD3", 3); - msgpack_pack_int(&ctx->mp_pck, *p << 8 | *(p + 1)); - p += 2; - } - - if (mask_ain & 0x80) { - msgpack_pack_bin(&ctx->mp_pck, 3); - msgpack_pack_bin_body(&ctx->mp_pck, "VCC", 3); - msgpack_pack_int(&ctx->mp_pck, *p << 8 | *(p + 1)); - p += 2; - } - pthread_mutex_unlock(&ctx->mtx_mp); -} - void *in_xbee_flush(void *in_context, int *size) { char *buf; @@ -418,7 +321,7 @@ int in_xbee_init(struct flb_config *config) return ret; } - if ((ret = xbee_conCallbackSet(ctx->con_io, in_xbee_io_sample_cb, NULL)) != XBEE_ENONE) { + if ((ret = xbee_conCallbackSet(ctx->con_io, in_xbee_iosampling_cb, NULL)) != XBEE_ENONE) { xbee_log(xbee, -1, "xbee_conCallbackSet() returned: %d", ret); return ret; } diff --git a/plugins/in_xbee/in_xbee.h b/plugins/in_xbee/in_xbee.h index 1760f4d361d..4eaad87acc4 100644 --- a/plugins/in_xbee/in_xbee.h +++ b/plugins/in_xbee/in_xbee.h @@ -28,6 +28,7 @@ #define IN_XBEE_COLLECT_SEC 0 #define IN_XBEE_COLLECT_NSEC 15000 + extern struct flb_input_plugin in_xbee_plugin; #endif diff --git a/plugins/in_xbee/in_xbee_iosampling.c b/plugins/in_xbee/in_xbee_iosampling.c new file mode 100644 index 00000000000..a10c73fbf55 --- /dev/null +++ b/plugins/in_xbee/in_xbee_iosampling.c @@ -0,0 +1,166 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +#include "in_xbee.h" +#include "in_xbee_iosampling.h" +#include "in_xbee_config.h" + +struct xbee_ioport { + unsigned int mask; + const char *port_name; +}; + +static struct xbee_ioport digital_ports[] = { + { 1 << 0, "DIO0" }, + { 1 << 1, "DIO1" }, + { 1 << 2, "DIO2" }, + { 1 << 3, "DIO3" }, + { 1 << 4, "DIO4" }, + { 1 << 5, "DIO5" }, + { 1 << 6, "DIO6" }, + { 1 << 7, "GPIO7" }, + { 1 << 10, "DIO10" }, + { 1 << 11, "DIO11" }, + { 1 << 12, "DIO12" }, +}; + +static struct xbee_ioport analog_ports[] = { + { 1 << 0, "AD0" }, + { 1 << 1, "AD1" }, + { 1 << 2, "AD2" }, + { 1 << 3, "AD3" }, + { 1 << 7, "VCC" }, +}; + +void in_xbee_flush_if_needed(struct flb_in_xbee_config *ctx); + +/* + * returns how many datas in the iosample packet + */ +int in_xbee_iosampling_count_maps(unsigned int mask_din, unsigned int mask_ain) +{ + int i; + int map_len = 0; + for (i = 0; i < sizeof(digital_ports) / sizeof(struct xbee_ioport); i++) + if (mask_din & digital_ports[i].mask) + map_len++; + + for (i = 0; i < sizeof(analog_ports) / sizeof(struct xbee_ioport); i++) + if (mask_ain & analog_ports[i].mask) + map_len++; + + return map_len; +} + +int in_xbee_iosampling_decode_ios(struct msgpack_packer *buf, unsigned char *p, unsigned int mask_din, unsigned int mask_ain) +{ + int i; + int din, ain; + + /* + * Digital pins data comes first. + */ + if (mask_din) { + /* sampled digital data sets */ + din = *p << 8 | *(p + 1); + p += 2; + + for (i = 0; i < sizeof(digital_ports) / sizeof(struct xbee_ioport); i++) { + struct xbee_ioport *port = &digital_ports[i]; + if (mask_din & port->mask) { + msgpack_pack_bin(buf, strlen(port->port_name)); + msgpack_pack_bin_body(buf, (char*) port->port_name, strlen(port->port_name)); + msgpack_pack_int(buf, (din & port->mask) > 0); + } + } + } + + /* + * Analog pins + */ + for (i = 0; i < sizeof(analog_ports) / sizeof(struct xbee_ioport); i++) { + struct xbee_ioport *port = &analog_ports[i]; + if (mask_ain & port->mask) { + msgpack_pack_bin(buf, strlen(port->port_name)); + msgpack_pack_bin_body(buf, (char*) port->port_name, strlen(port->port_name)); + msgpack_pack_int(buf, *p << 8 | *(p + 1)); + p += 2; + } + } + + /* + * FixMe: num of maps should match with in_xbee_iosample_count_maps() result + */ + return 1; +} + +void in_xbee_iosampling_cb(struct xbee *xbee, struct xbee_con *con, + struct xbee_pkt **pkt, void **data) +{ + struct flb_in_xbee_config *ctx = *data; + int i; + int map_len = 0; + unsigned int mask_din, mask_ain; + + if ((*pkt)->dataLen == 0) { + flb_debug("xbee data length too short, skip"); + return; + } + + unsigned char *p = (unsigned char*) (*pkt)->data; + + if (*p != 1) + return; + + mask_din = *(p + 1) << 8 | *(p + 2); + mask_ain = *(p + 3); + + map_len = in_xbee_iosampling_count_maps(mask_din, mask_ain); + + p += 4; + + flb_debug("[xbee] IO sample: mask_din=0x%x mask_ain=%x map_len=%d", mask_din, mask_ain, map_len); + + pthread_mutex_lock(&ctx->mtx_mp); + + in_xbee_flush_if_needed(ctx); + ctx->buffer_id++; + + msgpack_pack_array(&ctx->mp_pck, 2); + msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); + msgpack_pack_map(&ctx->mp_pck, map_len); + + in_xbee_iosampling_decode_ios(&ctx->mp_pck, p, mask_din, mask_ain); + + pthread_mutex_unlock(&ctx->mtx_mp); +} diff --git a/plugins/in_xbee/in_xbee_iosampling.h b/plugins/in_xbee/in_xbee_iosampling.h new file mode 100644 index 00000000000..f8806f68a24 --- /dev/null +++ b/plugins/in_xbee/in_xbee_iosampling.h @@ -0,0 +1,26 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_XBEE_IOSAMPLING +#define FLB_IN_XBEE_IOSAMPLING + +void in_xbee_iosampling_cb(struct xbee *xbee, struct xbee_con *con, + struct xbee_pkt **pkt, void **data); + +#endif From f819378b7d106936919a19b164d458c928c1dd5d Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Mon, 20 Jul 2015 03:14:33 +0900 Subject: [PATCH 08/10] in_xbee: added source address data to XBee I/O sampling data Signed-off-by: Takeshi HASEGAWA --- plugins/in_xbee/CMakeLists.txt | 2 +- plugins/in_xbee/in_xbee_iosampling.c | 11 +++++ plugins/in_xbee/in_xbee_utils.c | 67 ++++++++++++++++++++++++++++ plugins/in_xbee/in_xbee_utils.h | 32 +++++++++++++ 4 files changed, 111 insertions(+), 1 deletion(-) create mode 100644 plugins/in_xbee/in_xbee_utils.c create mode 100644 plugins/in_xbee/in_xbee_utils.h diff --git a/plugins/in_xbee/CMakeLists.txt b/plugins/in_xbee/CMakeLists.txt index 21ba144c2ae..94439303275 100644 --- a/plugins/in_xbee/CMakeLists.txt +++ b/plugins/in_xbee/CMakeLists.txt @@ -3,6 +3,6 @@ add_subdirectory(lib/libxbee-v3) include_directories(lib/libxbee-v3) set(src - in_xbee.c in_xbee_config.c in_xbee_iosampling.c) + in_xbee.c in_xbee_config.c in_xbee_iosampling.c in_xbee_utils.c) FLB_PLUGIN(in_xbee "${src}" "xbee") diff --git a/plugins/in_xbee/in_xbee_iosampling.c b/plugins/in_xbee/in_xbee_iosampling.c index a10c73fbf55..287cd148b01 100644 --- a/plugins/in_xbee/in_xbee_iosampling.c +++ b/plugins/in_xbee/in_xbee_iosampling.c @@ -34,6 +34,7 @@ #include "in_xbee.h" #include "in_xbee_iosampling.h" #include "in_xbee_config.h" +#include "in_xbee_utils.h" struct xbee_ioport { unsigned int mask; @@ -131,6 +132,7 @@ void in_xbee_iosampling_cb(struct xbee *xbee, struct xbee_con *con, int i; int map_len = 0; unsigned int mask_din, mask_ain; + char source_addr[8 * 2 + 1]; if ((*pkt)->dataLen == 0) { flb_debug("xbee data length too short, skip"); @@ -146,9 +148,12 @@ void in_xbee_iosampling_cb(struct xbee *xbee, struct xbee_con *con, mask_ain = *(p + 3); map_len = in_xbee_iosampling_count_maps(mask_din, mask_ain); + map_len++; /* for addr field */ p += 4; + in_xbee_conAddress2str((char*) &source_addr, sizeof(source_addr), &(*pkt)->address); + flb_debug("[xbee] IO sample: mask_din=0x%x mask_ain=%x map_len=%d", mask_din, mask_ain, map_len); pthread_mutex_lock(&ctx->mtx_mp); @@ -160,6 +165,12 @@ void in_xbee_iosampling_cb(struct xbee *xbee, struct xbee_con *con, msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); msgpack_pack_map(&ctx->mp_pck, map_len); + /* source address */ + msgpack_pack_bin(&ctx->mp_pck, 8); + msgpack_pack_bin_body(&ctx->mp_pck, "src_addr", 8); + msgpack_pack_bin(&ctx->mp_pck, strlen((char*) &source_addr)); + msgpack_pack_bin_body(&ctx->mp_pck, (char*) &source_addr, strlen((char*) &source_addr)); + in_xbee_iosampling_decode_ios(&ctx->mp_pck, p, mask_din, mask_ain); pthread_mutex_unlock(&ctx->mtx_mp); diff --git a/plugins/in_xbee/in_xbee_utils.c b/plugins/in_xbee/in_xbee_utils.c new file mode 100644 index 00000000000..86be66d97a6 --- /dev/null +++ b/plugins/in_xbee/in_xbee_utils.c @@ -0,0 +1,67 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +#include "in_xbee.h" +#include "in_xbee_config.h" + +int in_xbee_conAddress2str(char *buf, int size, struct xbee_conAddress *addr) { + int addr_len; + int i; + int len; + char *src; + + if (size < 1) + return -1; + + *buf = 0; + + if (addr->addr64_enabled) { + addr_len = 8; + src = (char*) &addr->addr64; + } else if (addr->addr16_enabled) { + addr_len = 1; + src = (char*) &addr->addr16; + } else { + flb_debug("xbee_conAddress has no address data?\n"); + return 0; + } + + len = 0; + for (i = 0; i < addr_len; i++) { + snprintf(buf + len, size - len, "%2.2x", *(src + i)); + len += 2; + } + + return 1; +} + diff --git a/plugins/in_xbee/in_xbee_utils.h b/plugins/in_xbee/in_xbee_utils.h new file mode 100644 index 00000000000..792be31dd98 --- /dev/null +++ b/plugins/in_xbee/in_xbee_utils.h @@ -0,0 +1,32 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_XBEE_UTILS +#define FLB_IN_XBEE_UTILS + +#include +#include +#include +#include +#include +#include + +int in_xbee_conAddress2str(char *buf, int size, struct xbee_conAddress *addr); + +#endif From aa6747c9f215330e6264131d80072bec4a91a81a Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Mon, 20 Jul 2015 03:41:25 +0900 Subject: [PATCH 09/10] in_xbee: Fixed callback handler bug introduced in commit 3068ebed4aefa1346c1a054c268d8d81af894071 Signed-off-by: Takeshi HASEGAWA --- plugins/in_xbee/in_xbee.c | 3 +++ 1 file changed, 3 insertions(+) diff --git a/plugins/in_xbee/in_xbee.c b/plugins/in_xbee/in_xbee.c index d7e69bb19e8..1d8fe1327f7 100644 --- a/plugins/in_xbee/in_xbee.c +++ b/plugins/in_xbee/in_xbee.c @@ -171,6 +171,9 @@ void in_xbee_cb(struct xbee *xbee, struct xbee_con *con, printf("\n"); #endif + if (! in_xbee_rx_queue_msgpack(ctx, (const char*) (*pkt)->data, (*pkt)->dataLen)) { + in_xbee_rx_queue_raw(ctx, (const char*) (*pkt)->data, (*pkt)->dataLen); + } } void *in_xbee_flush(void *in_context, int *size) From c3d04499f0392f0e07d8f8732645278460230e93 Mon Sep 17 00:00:00 2001 From: Takeshi HASEGAWA Date: Mon, 20 Jul 2015 03:49:04 +0900 Subject: [PATCH 10/10] in_xbee: several warning/styling cleanups Signed-off-by: Takeshi HASEGAWA --- plugins/in_xbee/in_xbee_config.c | 1 - plugins/in_xbee/in_xbee_iosampling.c | 5 ++--- 2 files changed, 2 insertions(+), 4 deletions(-) diff --git a/plugins/in_xbee/in_xbee_config.c b/plugins/in_xbee/in_xbee_config.c index 4d58d7cf411..7029a6a21d8 100644 --- a/plugins/in_xbee/in_xbee_config.c +++ b/plugins/in_xbee/in_xbee_config.c @@ -36,7 +36,6 @@ int in_xbee_config_read_int(int *dest, struct mk_rconf_section *section, char *k struct flb_in_xbee_config *xbee_config_read(struct flb_in_xbee_config *config, struct mk_rconf *conf) { char *file = NULL; - char *baudrate = NULL; char *xbee_mode = NULL; struct mk_rconf_section *section; diff --git a/plugins/in_xbee/in_xbee_iosampling.c b/plugins/in_xbee/in_xbee_iosampling.c index 287cd148b01..19a8d4f57b5 100644 --- a/plugins/in_xbee/in_xbee_iosampling.c +++ b/plugins/in_xbee/in_xbee_iosampling.c @@ -86,7 +86,7 @@ int in_xbee_iosampling_count_maps(unsigned int mask_din, unsigned int mask_ain) int in_xbee_iosampling_decode_ios(struct msgpack_packer *buf, unsigned char *p, unsigned int mask_din, unsigned int mask_ain) { int i; - int din, ain; + int din; /* * Digital pins data comes first. @@ -129,8 +129,7 @@ void in_xbee_iosampling_cb(struct xbee *xbee, struct xbee_con *con, struct xbee_pkt **pkt, void **data) { struct flb_in_xbee_config *ctx = *data; - int i; - int map_len = 0; + int map_len = 0; unsigned int mask_din, mask_ain; char source_addr[8 * 2 + 1];