Skip to content

Commit

Permalink
Producer/Consumer table binary support (#801)
Browse files Browse the repository at this point in the history
We want to make the Producer/Consumer table can support binary messages, The native C string (char *) will be replaced to pointers and its lengths in all paths.
Meanwhile, the Python interfaces of SWIG can only handle the type, str, with UTF-8. So, we need to specialize the SWIG interfaces from bytes of Python to string of C++.
---------

Signed-off-by: Ze Gan <ganze718@gmail.com>
Co-authored-by: Qi Luo <qiluo-msft@users.noreply.github.com>
  • Loading branch information
Pterosaur and qiluo-msft authored Jul 13, 2023
1 parent e0f394c commit 62e8d80
Show file tree
Hide file tree
Showing 11 changed files with 181 additions and 74 deletions.
6 changes: 3 additions & 3 deletions common/consumerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const st

auto& ctx = ctx0->element[ie];
assert(ctx->element[0]->type == REDIS_REPLY_STRING);
std::string key = ctx->element[0]->str;
std::string key(ctx->element[0]->str, ctx->element[0]->len);
kfvKey(kco) = key;

assert(ctx->element[1]->type == REDIS_REPLY_ARRAY);
auto ctx1 = ctx->element[1];
for (size_t i = 0; i < ctx1->elements / 2; i++)
{
FieldValueTuple e;
fvField(e) = ctx1->element[i * 2]->str;
fvValue(e) = ctx1->element[i * 2 + 1]->str;
fvField(e).assign(ctx1->element[i * 2]->str, ctx1->element[i * 2]->len);
fvValue(e).assign(ctx1->element[i * 2 + 1]->str, ctx1->element[i * 2 + 1]->len);
values.push_back(e);
}

Expand Down
12 changes: 2 additions & 10 deletions common/luatable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,9 @@ bool LuaTable::get(const vector<string> &luaKeys, vector<FieldValueTuple> &value
args.emplace_back(v);
}

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
command.format(args);
RedisReply r(m_db.get(), command, REDIS_REPLY_ARRAY);
redisReply *reply = r.getContext();

Expand Down Expand Up @@ -109,13 +105,9 @@ bool LuaTable::hget(const vector<string> &luaKeys, const string &field, string &
args.emplace_back(v);
}

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
command.format(args);
RedisReply r(m_db.get(), command);
redisReply *reply = r.getContext();

Expand Down
36 changes: 6 additions & 30 deletions common/producerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,9 @@ void ProducerStateTable::set(const string &key, const vector<FieldValueTuple> &v
args.emplace_back(fvValue(iv));
}

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
Expand Down Expand Up @@ -171,13 +167,9 @@ void ProducerStateTable::del(const string &key, const string &op /*= DEL_COMMAND
args.emplace_back("''");
args.emplace_back("''");

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
Expand Down Expand Up @@ -224,13 +216,9 @@ void ProducerStateTable::set(const std::vector<KeyOpFieldsValuesTuple>& values)
}
}

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
Expand Down Expand Up @@ -265,13 +253,9 @@ void ProducerStateTable::del(const std::vector<std::string>& keys)
}
args.emplace_back("G");

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
Expand Down Expand Up @@ -307,13 +291,9 @@ void ProducerStateTable::clear()
args.emplace_back(getStateHashPrefix() + getTableName());
args.emplace_back(getDelKeySetName());

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand cmd;
cmd.formatArgv((int)args1.size(), &args1[0], NULL);
cmd.format(args);
m_pipe->push(cmd, REDIS_REPLY_NIL);
m_pipe->flush();
}
Expand Down Expand Up @@ -466,13 +446,9 @@ void ProducerStateTable::apply_temp_view()
SWSS_LOG_DEBUG("apply_view.lua is called with following argument list: %s", ss.str().c_str());
}

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
m_pipe->flush();

Expand Down
10 changes: 1 addition & 9 deletions common/redisapi.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,8 @@ static inline std::set<std::string> runRedisScript(RedisContext &ctx, const std:
args.insert(args.end(), argv.begin(), argv.end());
args.push_back("''");

// Convert to vector of char *
std::vector<const char *> c_args;
transform(
args.begin(),
args.end(),
std::back_inserter(c_args),
[](const std::string& s) { return s.c_str(); } );

RedisCommand command;
command.formatArgv(static_cast<int>(c_args.size()), c_args.data(), NULL);
command.format(args);

std::set<std::string> ret;
try
Expand Down
35 changes: 24 additions & 11 deletions common/rediscommand.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#include <vector>
#include <hiredis/hiredis.h>
#include "rediscommand.h"
#include "stringutility.h"

using namespace std;

namespace swss {

RedisCommand::RedisCommand()
: temp(NULL)
: temp(NULL),
len(0)
{
}

Expand All @@ -26,7 +28,7 @@ void RedisCommand::format(const char *fmt, ...)

va_list ap;
va_start(ap, fmt);
int len = redisvFormatCommand(&temp, fmt, ap);
len = redisvFormatCommand(&temp, fmt, ap);
va_end(ap);
if (len == -1) {
throw std::bad_alloc();
Expand All @@ -43,7 +45,7 @@ void RedisCommand::formatArgv(int argc, const char **argv, const size_t *argvlen
temp = nullptr;
}

int len = redisFormatCommandArgv(&temp, argc, argv, argvlen);
len = redisFormatCommandArgv(&temp, argc, argv, argvlen);
if (len == -1) {
throw std::bad_alloc();
}
Expand All @@ -52,11 +54,13 @@ void RedisCommand::formatArgv(int argc, const char **argv, const size_t *argvlen
void RedisCommand::format(const vector<string> &commands)
{
vector<const char*> args;
vector<size_t> lens;
for (auto& command : commands)
{
args.push_back(command.c_str());
lens.push_back(command.size());
}
formatArgv(static_cast<int>(args.size()), args.data(), NULL);
formatArgv(static_cast<int>(args.size()), args.data(), lens.data());
}

/* Format HSET key multiple field value command */
Expand Down Expand Up @@ -96,12 +100,9 @@ void RedisCommand::formatHDEL(const std::string& key, const std::vector<std::str
{
if (fields.empty()) throw std::invalid_argument("empty values");

std::vector<const char *> args = {"HDEL", key.c_str()};
for (const std::string &f : fields)
{
args.push_back(f.c_str());
}
formatArgv(static_cast<int>(args.size()), args.data(), NULL);
std::vector<string> args = {"HDEL", key};
args.insert(args.end(), fields.begin(), fields.end());
format(args);
}

/* Format EXPIRE key field command */
Expand All @@ -122,14 +123,26 @@ void RedisCommand::formatDEL(const std::string& key)
return format("DEL %s", key.c_str());
}

int RedisCommand::appendTo(redisContext *ctx) const
{
return redisAppendFormattedCommand(ctx, c_str(), length());
}

std::string RedisCommand::toPrintableString() const
{
return binary_to_printable(temp, len);
}

const char *RedisCommand::c_str() const
{
return temp;
}

size_t RedisCommand::length() const
{
return strlen(temp);
if (len <= 0)
return 0;
return static_cast<size_t>(len);
}

}
16 changes: 12 additions & 4 deletions common/rediscommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <string>
#include <stdexcept>
#include <map>
#include <hiredis/hiredis.h>

namespace swss {

Expand All @@ -17,6 +18,7 @@ typedef std::tuple<std::string, std::string, std::vector<FieldValueTuple> > KeyO
#define kfvOp std::get<1>
#define kfvFieldsValues std::get<2>


class RedisCommand {
public:
RedisCommand();
Expand Down Expand Up @@ -64,12 +66,18 @@ class RedisCommand {
/* Format DEL key command */
void formatDEL(const std::string& key);

int appendTo(redisContext *ctx) const;

std::string toPrintableString() const;

protected:
const char *c_str() const;

size_t length() const;

private:
char *temp;
int len;
};

template<typename InputIterator>
Expand All @@ -80,15 +88,15 @@ void RedisCommand::formatHSET(const std::string &key,

const char* cmd = "HSET";

std::vector<const char*> args = { cmd, key.c_str() };
std::vector<std::string> args = { cmd, key.c_str() };

for (auto i = start; i != stop; i++)
{
args.push_back(fvField(*i).c_str());
args.push_back(fvValue(*i).c_str());
args.push_back(fvField(*i));
args.push_back(fvValue(*i));
}

formatArgv((int)args.size(), args.data(), NULL);
format(args);
}

}
2 changes: 1 addition & 1 deletion common/redispipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class RedisPipeline {
case REDIS_REPLY_STATUS:
case REDIS_REPLY_INTEGER:
{
int rc = redisAppendFormattedCommand(m_db->getContext(), command.c_str(), command.length());
int rc = command.appendTo(m_db->getContext());
if (rc != REDIS_OK)
{
// The only reason of error is REDIS_ERR_OOM (Out of memory)
Expand Down
12 changes: 6 additions & 6 deletions common/redisreply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ inline void guard(FUNC func, const char* command)

RedisReply::RedisReply(RedisContext *ctx, const RedisCommand& command)
{
int rc = redisAppendFormattedCommand(ctx->getContext(), command.c_str(), command.length());
int rc = command.appendTo(ctx->getContext());
if (rc != REDIS_OK)
{
// The only reason of error is REDIS_ERR_OOM (Out of memory)
Expand All @@ -89,9 +89,9 @@ RedisReply::RedisReply(RedisContext *ctx, const RedisCommand& command)
rc = redisGetReply(ctx->getContext(), (void**)&m_reply);
if (rc != REDIS_OK)
{
throw RedisError("Failed to redisGetReply with " + string(command.c_str()), ctx->getContext());
throw RedisError("Failed to redisGetReply with " + command.toPrintableString(), ctx->getContext());
}
guard([&]{checkReply();}, command.c_str());
guard([&]{checkReply();}, command.toPrintableString().c_str());
}

RedisReply::RedisReply(RedisContext *ctx, const string& command)
Expand All @@ -109,19 +109,19 @@ RedisReply::RedisReply(RedisContext *ctx, const string& command)
{
throw RedisError("Failed to redisGetReply with " + command, ctx->getContext());
}
guard([&]{checkReply();}, command.c_str());
guard([&]{checkReply();}, binary_to_printable(command.c_str(), command.length()).c_str());
}

RedisReply::RedisReply(RedisContext *ctx, const RedisCommand& command, int expectedType)
: RedisReply(ctx, command)
{
guard([&]{checkReplyType(expectedType);}, command.c_str());
guard([&]{checkReplyType(expectedType);}, command.toPrintableString().c_str());
}

RedisReply::RedisReply(RedisContext *ctx, const string& command, int expectedType)
: RedisReply(ctx, command)
{
guard([&]{checkReplyType(expectedType);}, command.c_str());
guard([&]{checkReplyType(expectedType);}, binary_to_printable(command.c_str(), command.length()).c_str());
}

RedisReply::RedisReply(redisReply *reply) :
Expand Down
Loading

0 comments on commit 62e8d80

Please sign in to comment.