diff --git a/common/consumer_table_pops.lua b/common/consumer_table_pops.lua index ee00fe240..172993bd9 100644 --- a/common/consumer_table_pops.lua +++ b/common/consumer_table_pops.lua @@ -23,7 +23,9 @@ for i = n, 1, -3 do end table.insert(rets, ret) - if op == 'bulkset' or op == 'bulkcreate' or op == 'bulkremove' then + if ARGV[2] == "0" then + -- do nothing, we don't want to modify redis during pop + elseif op == 'bulkset' or op == 'bulkcreate' or op == 'bulkremove' then -- key is "OBJECT_TYPE:num", extract object type from key key = key:sub(1, string.find(key, ':') - 1) diff --git a/common/consumertable.cpp b/common/consumertable.cpp index 8d0f3531b..fa340f88f 100644 --- a/common/consumertable.cpp +++ b/common/consumertable.cpp @@ -17,6 +17,7 @@ namespace swss { ConsumerTable::ConsumerTable(DBConnector *db, const string &tableName, int popBatchSize, int pri) : ConsumerTableBase(db, tableName, popBatchSize, pri) , TableName_KeyValueOpQueues(tableName) + , m_modifyRedis(true) { std::string luaScript = loadLuaScript("consumer_table_pops.lua"); m_shaPop = loadRedisScript(db, luaScript); @@ -39,15 +40,23 @@ ConsumerTable::ConsumerTable(DBConnector *db, const string &tableName, int popBa setQueueLength(len/3); } +void ConsumerTable::setModifyRedis(bool modify) +{ + SWSS_LOG_ENTER(); + + m_modifyRedis = modify; +} + void ConsumerTable::pops(deque &vkco, const string &prefix) { RedisCommand command; command.format( - "EVALSHA %s 2 %s %s %d ''", + "EVALSHA %s 2 %s %s %d %d", m_shaPop.c_str(), getKeyValueOpQueueTableName().c_str(), (prefix+getTableName()).c_str(), - POP_BATCH_SIZE); + POP_BATCH_SIZE, + m_modifyRedis ? 1 : 0); RedisReply r(m_db, command, REDIS_REPLY_ARRAY); diff --git a/common/consumertable.h b/common/consumertable.h index f56b78963..449fa2984 100644 --- a/common/consumertable.h +++ b/common/consumertable.h @@ -18,8 +18,19 @@ class ConsumerTable : public ConsumerTableBase, public TableName_KeyValueOpQueue /* Get multiple pop elements */ void pops(std::deque &vkco, const std::string &prefix = EMPTY_PREFIX); + void setModifyRedis(bool modify); private: std::string m_shaPop; + + /** + * @brief Modify Redis database. + * + * If set to false, will not make changes to database during POPs operation. + * This will be utilized during synchronous mode. + * + * Default is true. + */ + bool m_modifyRedis; }; } diff --git a/tests/redis_ut.cpp b/tests/redis_ut.cpp index 351935e9c..02d133ed8 100644 --- a/tests/redis_ut.cpp +++ b/tests/redis_ut.cpp @@ -786,6 +786,45 @@ TEST(ProducerConsumer, PopEmpty) EXPECT_EQ(fvs.size(), 0U); } +TEST(ProducerConsumer, PopNoModify) +{ + clearDB(); + + std::string tableName = "tableName"; + + DBConnector db("TEST_DB", 0, true); + ProducerTable p(&db, tableName); + + std::vector values; + + FieldValueTuple fv("f", "v"); + values.push_back(fv); + + p.set("key", values, "set"); + + ConsumerTable c(&db, tableName); + + c.setModifyRedis(false); + + std::string key; + std::string op; + std::vector fvs; + + c.pop(key, op, fvs); //, "prefixNoMod_"); + + EXPECT_EQ(key, "key"); + EXPECT_EQ(op, "set"); + EXPECT_EQ(fvField(fvs[0]), "f"); + EXPECT_EQ(fvValue(fvs[0]), "v"); + + Table t(&db, tableName); + + string value_got; + bool r = t.hget("key", "f", value_got); + + ASSERT_FALSE(r); +} + TEST(ProducerConsumer, ConsumerSelectWithInitData) { clearDB();