Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 32 additions & 68 deletions plugins/in_serial/in_serial.c
Original file line number Diff line number Diff line change
Expand Up @@ -69,23 +69,13 @@ void *in_serial_flush(void *in_context, int *size)
static inline int process_line(char *line, struct flb_in_serial_config *ctx)
{
int line_len;
uint64_t val;
char *p = line;
char *end = NULL;
char msg[1024];

/* Increase buffer position */
ctx->buffer_id++;

errno = 0;
val = strtol(p, &end, 10);
if ((errno == ERANGE && (val == INT_MAX || val == INT_MIN))
|| (errno != 0 && val == 0)) {
goto fail;
}

/* Now process the human readable message */

line_len = strlen(p);
strncpy(msg, p, line_len);
msg[line_len] = '\0';
Expand All @@ -107,41 +97,38 @@ static inline int process_line(char *line, struct flb_in_serial_config *ctx)
(const char *) msg);

return 0;

fail:
ctx->buffer_id--;
return -1;
}

/* Callback triggered when some serial msgs are available */
int in_serial_collect(struct flb_config *config, void *in_context)
{
int ret;
int bytes;
char line[2024];
char line[1024];
struct flb_in_serial_config *ctx = in_context;

bytes = read(ctx->fd, line, sizeof(line) - 1);
if (bytes == -1) {
if (errno == -EPIPE) {
return -1;
while (1) {
bytes = read(ctx->fd, line, sizeof(line) - 1);
if (bytes == -1) {
if (errno == -EPIPE) {
return -1;
}
return 0;
}
return 0;
}
/* Always set a delimiter to avoid buffer trash */
line[bytes - 1] = '\0';

/* Check if our buffer is full */
if (ctx->buffer_id + 1 == SERIAL_BUFFER_SIZE) {
ret = flb_engine_flush(config, &in_serial_plugin);
if (ret == -1) {
ctx->buffer_id = 0;
/* Always set a delimiter to avoid buffer trash */
line[bytes - 1] = '\0';

/* Check if our buffer is full */
if (ctx->buffer_id + 1 == SERIAL_BUFFER_SIZE) {
ret = flb_engine_flush(config, &in_serial_plugin);
if (ret == -1) {
ctx->buffer_id = 0;
}
}
}

/* Process and enqueue the received line */
process_line(line, ctx);
return 0;
/* Process and enqueue the received line */
process_line(line, ctx);
}
}

/* Init serial input */
Expand Down Expand Up @@ -179,48 +166,15 @@ int in_serial_init(struct flb_config *config)
msgpack_sbuffer_init(&ctx->mp_sbuf);
msgpack_packer_init(&ctx->mp_pck, &ctx->mp_sbuf, msgpack_sbuffer_write);

tcgetattr(fd, &ctx->tio_orig);
memset(&ctx->tio, 0, sizeof(ctx->tio));
switch (atoi(ctx->bitrate)) {
case 1200:
ctx->tio.c_cflag = B1200;
break;
case 2400:
ctx->tio.c_cflag = B2400;
break;
case 4800:
ctx->tio.c_cflag = B4800;
break;
case 9600:
ctx->tio.c_cflag = B9600;
break;
case 19200:
ctx->tio.c_cflag = B19200;
break;
case 38400:
ctx->tio.c_cflag = B38400;
break;

#ifdef __LINUX__
case 576000:
ctx->tio.c_cflag = B576000;
break;
case 115200:
ctx->tio.c_cflag = B115200;
break;
#endif

default:
flb_utils_error_c("Invalid bitrate for serial plugin");
}

ctx->tio.c_cflag = ctx->tio.c_ispeed = ctx->tio.c_ospeed = atoi(ctx->bitrate);
ctx->tio.c_cflag |= CRTSCTS | CS8 | CLOCAL | CREAD;
ctx->tio.c_iflag = IGNPAR | IGNCR;
ctx->tio.c_oflag = 0;
ctx->tio.c_lflag = ICANON;

/* open device */
fd = open(ctx->file, O_RDWR | O_NOCTTY);
fd = open(ctx->file, O_RDWR | O_NOCTTY | O_NONBLOCK);
if (fd == -1) {
perror("open");
flb_utils_error_c("Could not open serial port device");
Expand All @@ -230,11 +184,21 @@ int in_serial_init(struct flb_config *config)
tcflush(fd, TCIFLUSH);
tcsetattr(fd, TCSANOW, &ctx->tio);

#if __linux__
/* Set our collector based on a file descriptor event */
ret = flb_input_set_collector_event("serial",
in_serial_collect,
ctx->fd,
config);
#else
/* Set our collector based on a timer event */
ret = flb_input_set_collector_time("serial",
in_serial_collect,
IN_SERIAL_COLLECT_SEC,
IN_SERIAL_COLLECT_NSEC,
config);
#endif

return 0;
}

Expand Down
3 changes: 2 additions & 1 deletion plugins/in_serial/in_serial.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@
#include <stdint.h>

#define SERIAL_BUFFER_SIZE 256
#define SERIAL_USEC_PER_SEC 1000000
#define IN_SERIAL_COLLECT_SEC 1
#define IN_SERIAL_COLLECT_NSEC 0

int in_serial_start();

Expand Down
2 changes: 1 addition & 1 deletion plugins/in_xbee/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@ add_subdirectory(lib/libxbee-v3)
include_directories(lib/libxbee-v3)

set(src
in_xbee.c in_xbee_config.c)
in_xbee.c in_xbee_config.c in_xbee_iosampling.c in_xbee_utils.c)

FLB_PLUGIN(in_xbee "${src}" "xbee")
71 changes: 29 additions & 42 deletions plugins/in_xbee/in_xbee.c
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
#include <msgpack.h>

#include "in_xbee.h"
#include "in_xbee_iosampling.h"
#include "in_xbee_config.h"

/*
Expand Down Expand Up @@ -175,23 +176,6 @@ void in_xbee_cb(struct xbee *xbee, struct xbee_con *con,
}
}

/* Callback triggered by timer */
int in_xbee_collect(struct flb_config *config, void *in_context)
{
int ret = 0;
void *p = NULL;
(void) config;
struct flb_in_xbee_config *ctx = in_context;

if ((ret = xbee_conCallbackGet(ctx->con,
(xbee_t_conCallback*) &p)) != XBEE_ENONE) {
flb_debug("xbee_conCallbackGet() returned: %d", ret);
return ret;
}

return 0;
}

void *in_xbee_flush(void *in_context, int *size)
{
char *buf;
Expand Down Expand Up @@ -221,7 +205,7 @@ void *in_xbee_flush(void *in_context, int *size)
return buf;

fail:
pthread_mutex_lock(&ctx->mtx_mp);
pthread_mutex_unlock(&ctx->mtx_mp);
return NULL;
}

Expand All @@ -231,7 +215,6 @@ int in_xbee_init(struct flb_config *config)
int ret;
struct stat dev_st;
struct xbee *xbee;
struct xbee_con *con;
struct xbee_conAddress address;
struct flb_in_xbee_config *ctx;
struct xbee_conSettings settings;
Expand Down Expand Up @@ -277,6 +260,7 @@ int in_xbee_init(struct flb_config *config)

ctx->config = config;
pthread_mutex_init(&ctx->mtx_mp, NULL);
ctx->buffer_len = 0;

/* Init library */
xbee_init();
Expand All @@ -303,25 +287,44 @@ int in_xbee_init(struct flb_config *config)
xbee_logLevelSet(xbee, ctx->xbeeLogLevel);

/* Prepare a connection with the peer XBee */
if ((ret = xbee_conNew(xbee, &con, "Data", &address)) != XBEE_ENONE) {

if ((ret = xbee_conNew(xbee, &ctx->con_data, "Data", &address)) != XBEE_ENONE) {
xbee_log(xbee, -1, "xbee_conNew() returned: %d (%s)", ret, xbee_errorToStr(ret));
return ret;
}

xbee_conSettings(con, NULL, &settings);
xbee_conSettings(ctx->con_data, NULL, &settings);
settings.disableAck = ctx->xbeeDisableAck ? 1 : 0;
settings.catchAll = ctx->xbeeCatchAll ? 1 : 0;
xbee_conSettings(con, &settings, NULL);
xbee_conSettings(ctx->con_data, &settings, NULL);

ctx->con = con;
ctx->buffer_len = 0;
if ((ret = xbee_conDataSet(ctx->con_data, ctx, NULL)) != XBEE_ENONE) {
xbee_log(xbee, -1, "xbee_conDataSet() returned: %d", ret);
return ret;
}

if ((ret = xbee_conCallbackSet(ctx->con_data, in_xbee_cb, NULL)) != XBEE_ENONE) {
xbee_log(xbee, -1, "xbee_conCallbackSet() returned: %d", ret);
return ret;
}


if ((ret = xbee_conNew(xbee, &ctx->con_io, "I/O", &address)) != XBEE_ENONE) {
xbee_log(xbee, -1, "xbee_conNew() returned: %d (%s)", ret, xbee_errorToStr(ret));
return ret;
}

xbee_conSettings(ctx->con_io, NULL, &settings);
settings.disableAck = ctx->xbeeDisableAck ? 1 : 0;
settings.catchAll = ctx->xbeeCatchAll ? 1 : 0;
xbee_conSettings(ctx->con_io, &settings, NULL);

if ((ret = xbee_conDataSet(con, ctx, NULL)) != XBEE_ENONE) {
if ((ret = xbee_conDataSet(ctx->con_io, ctx, NULL)) != XBEE_ENONE) {
xbee_log(xbee, -1, "xbee_conDataSet() returned: %d", ret);
return ret;
}

if ((ret = xbee_conCallbackSet(con, in_xbee_cb, NULL)) != XBEE_ENONE) {
if ((ret = xbee_conCallbackSet(ctx->con_io, in_xbee_iosampling_cb, NULL)) != XBEE_ENONE) {
xbee_log(xbee, -1, "xbee_conCallbackSet() returned: %d", ret);
return ret;
}
Expand All @@ -332,21 +335,6 @@ int in_xbee_init(struct flb_config *config)
flb_utils_error_c("Could not set configuration for xbee input plugin");
}

/*
* Set our collector based on time. We will trigger a collection at certain
* intervals. For now it works but it's not the ideal implementation. I am
* talking with libxbee maintainer to check possible workarounds and use
* proper events mechanism.
*/
ret = flb_input_set_collector_time("xbee",
in_xbee_collect,
IN_XBEE_COLLECT_SEC,
IN_XBEE_COLLECT_NSEC,
config);
if (ret == -1) {
flb_utils_error_c("Could not set collector for xbee input plugin");
}

return 0;
}

Expand All @@ -356,6 +344,5 @@ struct flb_input_plugin in_xbee_plugin = {
.description = "XBee Device",
.cb_init = in_xbee_init,
.cb_pre_run = NULL,
.cb_collect = in_xbee_collect,
.cb_flush_buf = in_xbee_flush,
};
1 change: 1 addition & 0 deletions plugins/in_xbee/in_xbee.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#define IN_XBEE_COLLECT_SEC 0
#define IN_XBEE_COLLECT_NSEC 15000


extern struct flb_input_plugin in_xbee_plugin;

#endif
1 change: 0 additions & 1 deletion plugins/in_xbee/in_xbee_config.c
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ int in_xbee_config_read_int(int *dest, struct mk_rconf_section *section, char *k
struct flb_in_xbee_config *xbee_config_read(struct flb_in_xbee_config *config, struct mk_rconf *conf)
{
char *file = NULL;
char *baudrate = NULL;
char *xbee_mode = NULL;

struct mk_rconf_section *section;
Expand Down
3 changes: 2 additions & 1 deletion plugins/in_xbee/in_xbee_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ struct flb_in_xbee_config {
char *xbeeMode;

/* Active connection context */
struct xbee_con *con;
struct xbee_con *con_data;
struct xbee_con *con_io;

/* buffering */
int buffer_len;
Expand Down
Loading