diff --git a/plugins/in_serial/in_serial.c b/plugins/in_serial/in_serial.c index df54cbdf8bf..f793b28fe0f 100644 --- a/plugins/in_serial/in_serial.c +++ b/plugins/in_serial/in_serial.c @@ -121,7 +121,7 @@ int in_serial_collect(struct flb_config *config, void *in_context) char line[2024]; struct flb_in_serial_config *ctx = in_context; - bytes = read(ctx->fd, line, sizeof(line) - 1); + bytes = read(ctx->fd, &line, sizeof(line) - 1); if (bytes == -1) { if (errno == -EPIPE) { return -1; @@ -179,41 +179,8 @@ int in_serial_init(struct flb_config *config) msgpack_sbuffer_init(&ctx->mp_sbuf); msgpack_packer_init(&ctx->mp_pck, &ctx->mp_sbuf, msgpack_sbuffer_write); - tcgetattr(fd, &ctx->tio_orig); memset(&ctx->tio, 0, sizeof(ctx->tio)); - switch (atoi(ctx->bitrate)) { - case 1200: - ctx->tio.c_cflag = B1200; - break; - case 2400: - ctx->tio.c_cflag = B2400; - break; - case 4800: - ctx->tio.c_cflag = B4800; - break; - case 9600: - ctx->tio.c_cflag = B9600; - break; - case 19200: - ctx->tio.c_cflag = B19200; - break; - case 38400: - ctx->tio.c_cflag = B38400; - break; - -#ifdef __LINUX__ - case 576000: - ctx->tio.c_cflag = B576000; - break; - case 115200: - ctx->tio.c_cflag = B115200; - break; -#endif - - default: - flb_utils_error_c("Invalid bitrate for serial plugin"); - } - + ctx->tio.c_cflag = atoi(ctx->bitrate); ctx->tio.c_cflag |= CRTSCTS | CS8 | CLOCAL | CREAD; ctx->tio.c_iflag = IGNPAR | IGNCR; ctx->tio.c_oflag = 0; diff --git a/plugins/in_xbee/CMakeLists.txt b/plugins/in_xbee/CMakeLists.txt index 5db75738f85..94439303275 100644 --- a/plugins/in_xbee/CMakeLists.txt +++ b/plugins/in_xbee/CMakeLists.txt @@ -3,6 +3,6 @@ add_subdirectory(lib/libxbee-v3) include_directories(lib/libxbee-v3) set(src - in_xbee.c in_xbee_config.c) + in_xbee.c in_xbee_config.c in_xbee_iosampling.c in_xbee_utils.c) FLB_PLUGIN(in_xbee "${src}" "xbee") diff --git a/plugins/in_xbee/in_xbee.c b/plugins/in_xbee/in_xbee.c index 7c9e587299c..1d8fe1327f7 100644 --- a/plugins/in_xbee/in_xbee.c +++ b/plugins/in_xbee/in_xbee.c @@ -32,6 +32,7 @@ #include #include "in_xbee.h" +#include "in_xbee_iosampling.h" #include "in_xbee_config.h" /* @@ -175,23 +176,6 @@ void in_xbee_cb(struct xbee *xbee, struct xbee_con *con, } } -/* Callback triggered by timer */ -int in_xbee_collect(struct flb_config *config, void *in_context) -{ - int ret = 0; - void *p = NULL; - (void) config; - struct flb_in_xbee_config *ctx = in_context; - - if ((ret = xbee_conCallbackGet(ctx->con, - (xbee_t_conCallback*) &p)) != XBEE_ENONE) { - flb_debug("xbee_conCallbackGet() returned: %d", ret); - return ret; - } - - return 0; -} - void *in_xbee_flush(void *in_context, int *size) { char *buf; @@ -221,7 +205,7 @@ void *in_xbee_flush(void *in_context, int *size) return buf; fail: - pthread_mutex_lock(&ctx->mtx_mp); + pthread_mutex_unlock(&ctx->mtx_mp); return NULL; } @@ -231,7 +215,6 @@ int in_xbee_init(struct flb_config *config) int ret; struct stat dev_st; struct xbee *xbee; - struct xbee_con *con; struct xbee_conAddress address; struct flb_in_xbee_config *ctx; struct xbee_conSettings settings; @@ -277,6 +260,7 @@ int in_xbee_init(struct flb_config *config) ctx->config = config; pthread_mutex_init(&ctx->mtx_mp, NULL); + ctx->buffer_len = 0; /* Init library */ xbee_init(); @@ -303,25 +287,44 @@ int in_xbee_init(struct flb_config *config) xbee_logLevelSet(xbee, ctx->xbeeLogLevel); /* Prepare a connection with the peer XBee */ - if ((ret = xbee_conNew(xbee, &con, "Data", &address)) != XBEE_ENONE) { + + if ((ret = xbee_conNew(xbee, &ctx->con_data, "Data", &address)) != XBEE_ENONE) { xbee_log(xbee, -1, "xbee_conNew() returned: %d (%s)", ret, xbee_errorToStr(ret)); return ret; } - xbee_conSettings(con, NULL, &settings); + xbee_conSettings(ctx->con_data, NULL, &settings); settings.disableAck = ctx->xbeeDisableAck ? 1 : 0; settings.catchAll = ctx->xbeeCatchAll ? 1 : 0; - xbee_conSettings(con, &settings, NULL); + xbee_conSettings(ctx->con_data, &settings, NULL); - ctx->con = con; - ctx->buffer_len = 0; + if ((ret = xbee_conDataSet(ctx->con_data, ctx, NULL)) != XBEE_ENONE) { + xbee_log(xbee, -1, "xbee_conDataSet() returned: %d", ret); + return ret; + } + + if ((ret = xbee_conCallbackSet(ctx->con_data, in_xbee_cb, NULL)) != XBEE_ENONE) { + xbee_log(xbee, -1, "xbee_conCallbackSet() returned: %d", ret); + return ret; + } + + + if ((ret = xbee_conNew(xbee, &ctx->con_io, "I/O", &address)) != XBEE_ENONE) { + xbee_log(xbee, -1, "xbee_conNew() returned: %d (%s)", ret, xbee_errorToStr(ret)); + return ret; + } + + xbee_conSettings(ctx->con_io, NULL, &settings); + settings.disableAck = ctx->xbeeDisableAck ? 1 : 0; + settings.catchAll = ctx->xbeeCatchAll ? 1 : 0; + xbee_conSettings(ctx->con_io, &settings, NULL); - if ((ret = xbee_conDataSet(con, ctx, NULL)) != XBEE_ENONE) { + if ((ret = xbee_conDataSet(ctx->con_io, ctx, NULL)) != XBEE_ENONE) { xbee_log(xbee, -1, "xbee_conDataSet() returned: %d", ret); return ret; } - if ((ret = xbee_conCallbackSet(con, in_xbee_cb, NULL)) != XBEE_ENONE) { + if ((ret = xbee_conCallbackSet(ctx->con_io, in_xbee_iosampling_cb, NULL)) != XBEE_ENONE) { xbee_log(xbee, -1, "xbee_conCallbackSet() returned: %d", ret); return ret; } @@ -332,21 +335,6 @@ int in_xbee_init(struct flb_config *config) flb_utils_error_c("Could not set configuration for xbee input plugin"); } - /* - * Set our collector based on time. We will trigger a collection at certain - * intervals. For now it works but it's not the ideal implementation. I am - * talking with libxbee maintainer to check possible workarounds and use - * proper events mechanism. - */ - ret = flb_input_set_collector_time("xbee", - in_xbee_collect, - IN_XBEE_COLLECT_SEC, - IN_XBEE_COLLECT_NSEC, - config); - if (ret == -1) { - flb_utils_error_c("Could not set collector for xbee input plugin"); - } - return 0; } @@ -356,6 +344,5 @@ struct flb_input_plugin in_xbee_plugin = { .description = "XBee Device", .cb_init = in_xbee_init, .cb_pre_run = NULL, - .cb_collect = in_xbee_collect, .cb_flush_buf = in_xbee_flush, }; diff --git a/plugins/in_xbee/in_xbee.h b/plugins/in_xbee/in_xbee.h index 1760f4d361d..4eaad87acc4 100644 --- a/plugins/in_xbee/in_xbee.h +++ b/plugins/in_xbee/in_xbee.h @@ -28,6 +28,7 @@ #define IN_XBEE_COLLECT_SEC 0 #define IN_XBEE_COLLECT_NSEC 15000 + extern struct flb_input_plugin in_xbee_plugin; #endif diff --git a/plugins/in_xbee/in_xbee_config.c b/plugins/in_xbee/in_xbee_config.c index 4d58d7cf411..7029a6a21d8 100644 --- a/plugins/in_xbee/in_xbee_config.c +++ b/plugins/in_xbee/in_xbee_config.c @@ -36,7 +36,6 @@ int in_xbee_config_read_int(int *dest, struct mk_rconf_section *section, char *k struct flb_in_xbee_config *xbee_config_read(struct flb_in_xbee_config *config, struct mk_rconf *conf) { char *file = NULL; - char *baudrate = NULL; char *xbee_mode = NULL; struct mk_rconf_section *section; diff --git a/plugins/in_xbee/in_xbee_config.h b/plugins/in_xbee/in_xbee_config.h index e207eefe142..1930a8fbdea 100644 --- a/plugins/in_xbee/in_xbee_config.h +++ b/plugins/in_xbee/in_xbee_config.h @@ -43,7 +43,8 @@ struct flb_in_xbee_config { char *xbeeMode; /* Active connection context */ - struct xbee_con *con; + struct xbee_con *con_data; + struct xbee_con *con_io; /* buffering */ int buffer_len; diff --git a/plugins/in_xbee/in_xbee_iosampling.c b/plugins/in_xbee/in_xbee_iosampling.c new file mode 100644 index 00000000000..19a8d4f57b5 --- /dev/null +++ b/plugins/in_xbee/in_xbee_iosampling.c @@ -0,0 +1,176 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +#include "in_xbee.h" +#include "in_xbee_iosampling.h" +#include "in_xbee_config.h" +#include "in_xbee_utils.h" + +struct xbee_ioport { + unsigned int mask; + const char *port_name; +}; + +static struct xbee_ioport digital_ports[] = { + { 1 << 0, "DIO0" }, + { 1 << 1, "DIO1" }, + { 1 << 2, "DIO2" }, + { 1 << 3, "DIO3" }, + { 1 << 4, "DIO4" }, + { 1 << 5, "DIO5" }, + { 1 << 6, "DIO6" }, + { 1 << 7, "GPIO7" }, + { 1 << 10, "DIO10" }, + { 1 << 11, "DIO11" }, + { 1 << 12, "DIO12" }, +}; + +static struct xbee_ioport analog_ports[] = { + { 1 << 0, "AD0" }, + { 1 << 1, "AD1" }, + { 1 << 2, "AD2" }, + { 1 << 3, "AD3" }, + { 1 << 7, "VCC" }, +}; + +void in_xbee_flush_if_needed(struct flb_in_xbee_config *ctx); + +/* + * returns how many datas in the iosample packet + */ +int in_xbee_iosampling_count_maps(unsigned int mask_din, unsigned int mask_ain) +{ + int i; + int map_len = 0; + for (i = 0; i < sizeof(digital_ports) / sizeof(struct xbee_ioport); i++) + if (mask_din & digital_ports[i].mask) + map_len++; + + for (i = 0; i < sizeof(analog_ports) / sizeof(struct xbee_ioport); i++) + if (mask_ain & analog_ports[i].mask) + map_len++; + + return map_len; +} + +int in_xbee_iosampling_decode_ios(struct msgpack_packer *buf, unsigned char *p, unsigned int mask_din, unsigned int mask_ain) +{ + int i; + int din; + + /* + * Digital pins data comes first. + */ + if (mask_din) { + /* sampled digital data sets */ + din = *p << 8 | *(p + 1); + p += 2; + + for (i = 0; i < sizeof(digital_ports) / sizeof(struct xbee_ioport); i++) { + struct xbee_ioport *port = &digital_ports[i]; + if (mask_din & port->mask) { + msgpack_pack_bin(buf, strlen(port->port_name)); + msgpack_pack_bin_body(buf, (char*) port->port_name, strlen(port->port_name)); + msgpack_pack_int(buf, (din & port->mask) > 0); + } + } + } + + /* + * Analog pins + */ + for (i = 0; i < sizeof(analog_ports) / sizeof(struct xbee_ioport); i++) { + struct xbee_ioport *port = &analog_ports[i]; + if (mask_ain & port->mask) { + msgpack_pack_bin(buf, strlen(port->port_name)); + msgpack_pack_bin_body(buf, (char*) port->port_name, strlen(port->port_name)); + msgpack_pack_int(buf, *p << 8 | *(p + 1)); + p += 2; + } + } + + /* + * FixMe: num of maps should match with in_xbee_iosample_count_maps() result + */ + return 1; +} + +void in_xbee_iosampling_cb(struct xbee *xbee, struct xbee_con *con, + struct xbee_pkt **pkt, void **data) +{ + struct flb_in_xbee_config *ctx = *data; + int map_len = 0; + unsigned int mask_din, mask_ain; + char source_addr[8 * 2 + 1]; + + if ((*pkt)->dataLen == 0) { + flb_debug("xbee data length too short, skip"); + return; + } + + unsigned char *p = (unsigned char*) (*pkt)->data; + + if (*p != 1) + return; + + mask_din = *(p + 1) << 8 | *(p + 2); + mask_ain = *(p + 3); + + map_len = in_xbee_iosampling_count_maps(mask_din, mask_ain); + map_len++; /* for addr field */ + + p += 4; + + in_xbee_conAddress2str((char*) &source_addr, sizeof(source_addr), &(*pkt)->address); + + flb_debug("[xbee] IO sample: mask_din=0x%x mask_ain=%x map_len=%d", mask_din, mask_ain, map_len); + + pthread_mutex_lock(&ctx->mtx_mp); + + in_xbee_flush_if_needed(ctx); + ctx->buffer_id++; + + msgpack_pack_array(&ctx->mp_pck, 2); + msgpack_pack_uint64(&ctx->mp_pck, time(NULL)); + msgpack_pack_map(&ctx->mp_pck, map_len); + + /* source address */ + msgpack_pack_bin(&ctx->mp_pck, 8); + msgpack_pack_bin_body(&ctx->mp_pck, "src_addr", 8); + msgpack_pack_bin(&ctx->mp_pck, strlen((char*) &source_addr)); + msgpack_pack_bin_body(&ctx->mp_pck, (char*) &source_addr, strlen((char*) &source_addr)); + + in_xbee_iosampling_decode_ios(&ctx->mp_pck, p, mask_din, mask_ain); + + pthread_mutex_unlock(&ctx->mtx_mp); +} diff --git a/plugins/in_xbee/in_xbee_iosampling.h b/plugins/in_xbee/in_xbee_iosampling.h new file mode 100644 index 00000000000..f8806f68a24 --- /dev/null +++ b/plugins/in_xbee/in_xbee_iosampling.h @@ -0,0 +1,26 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_XBEE_IOSAMPLING +#define FLB_IN_XBEE_IOSAMPLING + +void in_xbee_iosampling_cb(struct xbee *xbee, struct xbee_con *con, + struct xbee_pkt **pkt, void **data); + +#endif diff --git a/plugins/in_xbee/in_xbee_utils.c b/plugins/in_xbee/in_xbee_utils.c new file mode 100644 index 00000000000..86be66d97a6 --- /dev/null +++ b/plugins/in_xbee/in_xbee_utils.c @@ -0,0 +1,67 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include +#include +#include +#include + +#include + +#include +#include +#include +#include +#include + +#include "in_xbee.h" +#include "in_xbee_config.h" + +int in_xbee_conAddress2str(char *buf, int size, struct xbee_conAddress *addr) { + int addr_len; + int i; + int len; + char *src; + + if (size < 1) + return -1; + + *buf = 0; + + if (addr->addr64_enabled) { + addr_len = 8; + src = (char*) &addr->addr64; + } else if (addr->addr16_enabled) { + addr_len = 1; + src = (char*) &addr->addr16; + } else { + flb_debug("xbee_conAddress has no address data?\n"); + return 0; + } + + len = 0; + for (i = 0; i < addr_len; i++) { + snprintf(buf + len, size - len, "%2.2x", *(src + i)); + len += 2; + } + + return 1; +} + diff --git a/plugins/in_xbee/in_xbee_utils.h b/plugins/in_xbee/in_xbee_utils.h new file mode 100644 index 00000000000..792be31dd98 --- /dev/null +++ b/plugins/in_xbee/in_xbee_utils.h @@ -0,0 +1,32 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015 Treasure Data Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef FLB_IN_XBEE_UTILS +#define FLB_IN_XBEE_UTILS + +#include +#include +#include +#include +#include +#include + +int in_xbee_conAddress2str(char *buf, int size, struct xbee_conAddress *addr); + +#endif