Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Producer/Consumer table binary support #801

Merged
merged 14 commits into from
Jul 13, 2023
6 changes: 3 additions & 3 deletions common/consumerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,16 @@ void ConsumerStateTable::pops(std::deque<KeyOpFieldsValuesTuple> &vkco, const st

auto& ctx = ctx0->element[ie];
assert(ctx->element[0]->type == REDIS_REPLY_STRING);
std::string key = ctx->element[0]->str;
std::string key(ctx->element[0]->str, ctx->element[0]->len);
kfvKey(kco) = key;

assert(ctx->element[1]->type == REDIS_REPLY_ARRAY);
auto ctx1 = ctx->element[1];
for (size_t i = 0; i < ctx1->elements / 2; i++)
{
FieldValueTuple e;
fvField(e) = ctx1->element[i * 2]->str;
fvValue(e) = ctx1->element[i * 2 + 1]->str;
fvField(e).assign(ctx1->element[i * 2]->str, ctx1->element[i * 2]->len);
fvValue(e).assign(ctx1->element[i * 2 + 1]->str, ctx1->element[i * 2 + 1]->len);
values.push_back(e);
}

Expand Down
12 changes: 2 additions & 10 deletions common/luatable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,9 @@ bool LuaTable::get(const vector<string> &luaKeys, vector<FieldValueTuple> &value
args.emplace_back(v);
}

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
command.format(args);
RedisReply r(m_db.get(), command, REDIS_REPLY_ARRAY);
redisReply *reply = r.getContext();

Expand Down Expand Up @@ -109,13 +105,9 @@ bool LuaTable::hget(const vector<string> &luaKeys, const string &field, string &
args.emplace_back(v);
}

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
command.format(args);
RedisReply r(m_db.get(), command);
redisReply *reply = r.getContext();

Expand Down
36 changes: 6 additions & 30 deletions common/producerstatetable.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,9 @@ void ProducerStateTable::set(const string &key, const vector<FieldValueTuple> &v
args.emplace_back(fvValue(iv));
}

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
Expand Down Expand Up @@ -171,13 +167,9 @@ void ProducerStateTable::del(const string &key, const string &op /*= DEL_COMMAND
args.emplace_back("''");
args.emplace_back("''");

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
Expand Down Expand Up @@ -224,13 +216,9 @@ void ProducerStateTable::set(const std::vector<KeyOpFieldsValuesTuple>& values)
}
}

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
Expand Down Expand Up @@ -265,13 +253,9 @@ void ProducerStateTable::del(const std::vector<std::string>& keys)
}
args.emplace_back("G");

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
if (!m_buffered)
{
Expand Down Expand Up @@ -307,13 +291,9 @@ void ProducerStateTable::clear()
args.emplace_back(getStateHashPrefix() + getTableName());
args.emplace_back(getDelKeySetName());

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand cmd;
cmd.formatArgv((int)args1.size(), &args1[0], NULL);
cmd.format(args);
m_pipe->push(cmd, REDIS_REPLY_NIL);
m_pipe->flush();
}
Expand Down Expand Up @@ -466,13 +446,9 @@ void ProducerStateTable::apply_temp_view()
SWSS_LOG_DEBUG("apply_view.lua is called with following argument list: %s", ss.str().c_str());
}

// Transform data structure
vector<const char *> args1;
transform(args.begin(), args.end(), back_inserter(args1), [](const string &s) { return s.c_str(); } );

// Invoke redis command
RedisCommand command;
command.formatArgv((int)args1.size(), &args1[0], NULL);
command.format(args);
m_pipe->push(command, REDIS_REPLY_NIL);
m_pipe->flush();

Expand Down
10 changes: 1 addition & 9 deletions common/redisapi.h
Original file line number Diff line number Diff line change
Expand Up @@ -83,16 +83,8 @@ static inline std::set<std::string> runRedisScript(RedisContext &ctx, const std:
args.insert(args.end(), argv.begin(), argv.end());
args.push_back("''");

// Convert to vector of char *
std::vector<const char *> c_args;
transform(
args.begin(),
args.end(),
std::back_inserter(c_args),
[](const std::string& s) { return s.c_str(); } );

RedisCommand command;
command.formatArgv(static_cast<int>(c_args.size()), c_args.data(), NULL);
command.format(args);

std::set<std::string> ret;
try
Expand Down
35 changes: 24 additions & 11 deletions common/rediscommand.cpp
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
#include <vector>
#include <hiredis/hiredis.h>
#include "rediscommand.h"
#include "stringutility.h"

using namespace std;

namespace swss {

RedisCommand::RedisCommand()
: temp(NULL)
: temp(NULL),
len(0)
{
}

Expand All @@ -26,7 +28,7 @@ void RedisCommand::format(const char *fmt, ...)

va_list ap;
va_start(ap, fmt);
int len = redisvFormatCommand(&temp, fmt, ap);
len = redisvFormatCommand(&temp, fmt, ap);
va_end(ap);
if (len == -1) {
throw std::bad_alloc();
Expand All @@ -43,7 +45,7 @@ void RedisCommand::formatArgv(int argc, const char **argv, const size_t *argvlen
temp = nullptr;
}

int len = redisFormatCommandArgv(&temp, argc, argv, argvlen);
len = redisFormatCommandArgv(&temp, argc, argv, argvlen);
if (len == -1) {
throw std::bad_alloc();
}
Expand All @@ -52,11 +54,13 @@ void RedisCommand::formatArgv(int argc, const char **argv, const size_t *argvlen
void RedisCommand::format(const vector<string> &commands)
{
vector<const char*> args;
vector<size_t> lens;
for (auto& command : commands)
{
args.push_back(command.c_str());
lens.push_back(command.size());
}
formatArgv(static_cast<int>(args.size()), args.data(), NULL);
formatArgv(static_cast<int>(args.size()), args.data(), lens.data());
}

/* Format HSET key multiple field value command */
Expand Down Expand Up @@ -96,12 +100,9 @@ void RedisCommand::formatHDEL(const std::string& key, const std::vector<std::str
{
if (fields.empty()) throw std::invalid_argument("empty values");

std::vector<const char *> args = {"HDEL", key.c_str()};
for (const std::string &f : fields)
{
args.push_back(f.c_str());
}
formatArgv(static_cast<int>(args.size()), args.data(), NULL);
std::vector<string> args = {"HDEL", key};
args.insert(args.end(), fields.begin(), fields.end());
format(args);
}

/* Format EXPIRE key field command */
Expand All @@ -122,14 +123,26 @@ void RedisCommand::formatDEL(const std::string& key)
return format("DEL %s", key.c_str());
}

int RedisCommand::appendTo(redisContext *ctx) const
{
return redisAppendFormattedCommand(ctx, c_str(), length());
}

std::string RedisCommand::toPrintableString() const
{
return binary_to_printable(temp, len);
}

const char *RedisCommand::c_str() const
Copy link
Contributor

@qiluo-msft qiluo-msft Jul 8, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

c_str

In your new code, m_str may contains \0. Then it is super dangerous to expose c_str to outside. Do you want to deprecate this function? #Closed

Copy link
Contributor Author

@Pterosaur Pterosaur Jul 9, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so.

  1. Because we have already provided the function len to get the length, so \0 will be handled.
  2. As the STL string's convention, const char * doesn't matter to the `\0'.
  3. If we don't expose the const char * instead of expose a std::string, which is safer but MAY introduce one extra deep copy which is unnecessary.

{
return temp;
}

size_t RedisCommand::length() const
{
return strlen(temp);
if (len <= 0)
return 0;
return static_cast<size_t>(len);
}

}
16 changes: 12 additions & 4 deletions common/rediscommand.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#include <string>
#include <stdexcept>
#include <map>
#include <hiredis/hiredis.h>

namespace swss {

Expand All @@ -17,6 +18,7 @@ typedef std::tuple<std::string, std::string, std::vector<FieldValueTuple> > KeyO
#define kfvOp std::get<1>
#define kfvFieldsValues std::get<2>


class RedisCommand {
public:
RedisCommand();
Expand Down Expand Up @@ -64,12 +66,18 @@ class RedisCommand {
/* Format DEL key command */
void formatDEL(const std::string& key);

int appendTo(redisContext *ctx) const;

std::string toPrintableString() const;

protected:
const char *c_str() const;

size_t length() const;

private:
char *temp;
int len;
Pterosaur marked this conversation as resolved.
Show resolved Hide resolved
};

template<typename InputIterator>
Expand All @@ -80,15 +88,15 @@ void RedisCommand::formatHSET(const std::string &key,

const char* cmd = "HSET";

std::vector<const char*> args = { cmd, key.c_str() };
std::vector<std::string> args = { cmd, key.c_str() };

for (auto i = start; i != stop; i++)
{
args.push_back(fvField(*i).c_str());
args.push_back(fvValue(*i).c_str());
args.push_back(fvField(*i));
args.push_back(fvValue(*i));
}

formatArgv((int)args.size(), args.data(), NULL);
format(args);
}

}
2 changes: 1 addition & 1 deletion common/redispipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ class RedisPipeline {
case REDIS_REPLY_STATUS:
case REDIS_REPLY_INTEGER:
{
int rc = redisAppendFormattedCommand(m_db->getContext(), command.c_str(), command.length());
int rc = command.appendTo(m_db->getContext());
if (rc != REDIS_OK)
{
// The only reason of error is REDIS_ERR_OOM (Out of memory)
Expand Down
12 changes: 6 additions & 6 deletions common/redisreply.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ inline void guard(FUNC func, const char* command)

RedisReply::RedisReply(RedisContext *ctx, const RedisCommand& command)
{
int rc = redisAppendFormattedCommand(ctx->getContext(), command.c_str(), command.length());
int rc = command.appendTo(ctx->getContext());
if (rc != REDIS_OK)
{
// The only reason of error is REDIS_ERR_OOM (Out of memory)
Expand All @@ -89,9 +89,9 @@ RedisReply::RedisReply(RedisContext *ctx, const RedisCommand& command)
rc = redisGetReply(ctx->getContext(), (void**)&m_reply);
if (rc != REDIS_OK)
{
throw RedisError("Failed to redisGetReply with " + string(command.c_str()), ctx->getContext());
throw RedisError("Failed to redisGetReply with " + command.toPrintableString(), ctx->getContext());
}
guard([&]{checkReply();}, command.c_str());
guard([&]{checkReply();}, command.toPrintableString().c_str());
}

RedisReply::RedisReply(RedisContext *ctx, const string& command)
Expand All @@ -109,19 +109,19 @@ RedisReply::RedisReply(RedisContext *ctx, const string& command)
{
throw RedisError("Failed to redisGetReply with " + command, ctx->getContext());
}
guard([&]{checkReply();}, command.c_str());
guard([&]{checkReply();}, binary_to_printable(command.c_str(), command.length()).c_str());
}

RedisReply::RedisReply(RedisContext *ctx, const RedisCommand& command, int expectedType)
: RedisReply(ctx, command)
{
guard([&]{checkReplyType(expectedType);}, command.c_str());
guard([&]{checkReplyType(expectedType);}, command.toPrintableString().c_str());
}

RedisReply::RedisReply(RedisContext *ctx, const string& command, int expectedType)
: RedisReply(ctx, command)
{
guard([&]{checkReplyType(expectedType);}, command.c_str());
guard([&]{checkReplyType(expectedType);}, binary_to_printable(command.c_str(), command.length()).c_str());
}

RedisReply::RedisReply(redisReply *reply) :
Expand Down
Loading