Skip to content

Commit

Permalink
[ZK filter] Add missing operation supports for multi opcode (#32456)
Browse files Browse the repository at this point in the history
Signed-off-by: Zhewei Hu <zhewei.hu33@gmail.com>
  • Loading branch information
Winbobob authored Feb 29, 2024
1 parent 97ef386 commit 44d4d82
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 32 deletions.
30 changes: 24 additions & 6 deletions source/extensions/filters/network/zookeeper_proxy/decoder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -137,8 +137,11 @@ absl::StatusOr<absl::optional<OpCodes>> DecoderImpl::decodeOnData(Buffer::Instan
status, fmt::format("parseGetDataRequest: {}", status.message()));
break;
case OpCodes::Create:
ABSL_FALLTHROUGH_INTENDED;
case OpCodes::Create2:
ABSL_FALLTHROUGH_INTENDED;
case OpCodes::CreateContainer:
ABSL_FALLTHROUGH_INTENDED;
case OpCodes::CreateTtl:
status = parseCreateRequest(data, offset, len.value(), opcode);
RETURN_INVALID_ARG_ERR_IF_STATUS_NOT_OK(
Expand Down Expand Up @@ -642,22 +645,37 @@ absl::Status DecoderImpl::parseMultiRequest(Buffer::Instance& data, uint64_t& of
break;
}

switch (static_cast<OpCodes>(opcode.value())) {
const auto op = static_cast<OpCodes>(opcode.value());
switch (op) {
case OpCodes::Create:
status = parseCreateRequest(data, offset, len, OpCodes::Create);
EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Create);
ABSL_FALLTHROUGH_INTENDED;
case OpCodes::Create2:
ABSL_FALLTHROUGH_INTENDED;
case OpCodes::CreateContainer:
ABSL_FALLTHROUGH_INTENDED;
case OpCodes::CreateTtl:
status = parseCreateRequest(data, offset, len, op);
EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, op);
break;
case OpCodes::SetData:
status = parseSetRequest(data, offset, len);
EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::SetData);
EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, op);
break;
case OpCodes::Check:
status = parseCheckRequest(data, offset, len);
EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Check);
EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, op);
break;
case OpCodes::Delete:
status = parseDeleteRequest(data, offset, len);
EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, OpCodes::Delete);
EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, op);
break;
case OpCodes::GetChildren:
status = parseGetChildrenRequest(data, offset, len, false);
EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, op);
break;
case OpCodes::GetData:
status = parseGetDataRequest(data, offset, len);
EMIT_DECODER_ERR_AND_RETURN_IF_STATUS_NOT_OK(status, op);
break;
default:
callbacks_.onDecodeError(absl::nullopt);
Expand Down
72 changes: 46 additions & 26 deletions test/extensions/filters/network/zookeeper_proxy/filter_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -223,15 +223,18 @@ class ZooKeeperFilterTest : public testing::Test {
return buffer;
}

Buffer::OwnedImpl
encodePathWatch(const std::string& path, const bool watch, const int32_t xid = 1000,
const int32_t opcode = enumToSignedInt(OpCodes::GetData)) const {
Buffer::OwnedImpl encodePathWatch(const std::string& path, const bool watch,
const int32_t xid = 1000,
const int32_t opcode = enumToSignedInt(OpCodes::GetData),
const bool txn = false) const {
Buffer::OwnedImpl buffer;

buffer.writeBEInt<int32_t>(13 + path.length());
buffer.writeBEInt<int32_t>(xid);
// Opcode.
buffer.writeBEInt<int32_t>(opcode);
if (!txn) {
buffer.writeBEInt<int32_t>(13 + path.length());
buffer.writeBEInt<int32_t>(xid);
buffer.writeBEInt<int32_t>(opcode);
}

// Path.
addString(buffer, path);
// Watch.
Expand Down Expand Up @@ -311,7 +314,7 @@ class ZooKeeperFilterTest : public testing::Test {
return buffer;
}

Buffer::OwnedImpl encodeCreateRequestWithNegativeDataLen(
Buffer::OwnedImpl encodeCreateRequestWithoutZnodeData(
const std::string& path, const int32_t create_flag_val, const bool txn = false,
const int32_t opcode = enumToSignedInt(OpCodes::Create)) const {
Buffer::OwnedImpl buffer;
Expand Down Expand Up @@ -596,11 +599,11 @@ class ZooKeeperFilterTest : public testing::Test {
store_.counter(absl::StrCat("test.zookeeper.", opname, "_decoder_error")).value());
}

void testCreateWithNegativeDataLen(CreateFlags flag, const int32_t flag_val,
const OpCodes opcode = OpCodes::Create) {
void testCreateWithoutZnodeData(CreateFlags flag, const int32_t flag_val,
const OpCodes opcode = OpCodes::Create) {
initialize();
Buffer::OwnedImpl data =
encodeCreateRequestWithNegativeDataLen("/foo", flag_val, false, enumToSignedInt(opcode));
encodeCreateRequestWithoutZnodeData("/foo", flag_val, false, enumToSignedInt(opcode));
std::string opname = "create";

switch (opcode) {
Expand Down Expand Up @@ -1088,8 +1091,8 @@ TEST_F(ZooKeeperFilterTest, CreateRequestPersistent) {
testResponse({{{"opname", "create_resp"}, {"zxid", "2000"}, {"error", "0"}}, {{"bytes", "20"}}});
}

TEST_F(ZooKeeperFilterTest, CreateRequestPersistentWithNegativeDataLen) {
testCreateWithNegativeDataLen(CreateFlags::Persistent, 0);
TEST_F(ZooKeeperFilterTest, CreateRequestPersistentWithoutZnodeData) {
testCreateWithoutZnodeData(CreateFlags::Persistent, 0);
testResponse({{{"opname", "create_resp"}, {"zxid", "2000"}, {"error", "0"}}, {{"bytes", "20"}}});
}

Expand Down Expand Up @@ -1283,33 +1286,50 @@ TEST_F(ZooKeeperFilterTest, CheckRequest) {
TEST_F(ZooKeeperFilterTest, MultiRequest) {
initialize();

Buffer::OwnedImpl create1 = encodeCreateRequest("/foo", "1", 0, true);
Buffer::OwnedImpl create2 = encodeCreateRequest("/bar", "1", 0, true);
Buffer::OwnedImpl create3 = encodeCreateRequestWithNegativeDataLen("/baz", 0, true);
Buffer::OwnedImpl check1 = encodePathVersion("/foo", 100, enumToSignedInt(OpCodes::Check), true);
Buffer::OwnedImpl set1 = encodeSetRequest("/bar", "2", -1, true);
Buffer::OwnedImpl create1 =
encodeCreateRequest("/foo", "1", 0, true, 1000, enumToSignedInt(OpCodes::Create));
Buffer::OwnedImpl create2 =
encodeCreateRequest("/bar", "1", 0, true, 1000, enumToSignedInt(OpCodes::Create2));
Buffer::OwnedImpl create3 =
encodeCreateRequest("/baz", "1", 0, true, 1000, enumToSignedInt(OpCodes::CreateContainer));
Buffer::OwnedImpl create4 =
encodeCreateRequestWithoutZnodeData("/qux", 0, true, enumToSignedInt(OpCodes::CreateTtl));
Buffer::OwnedImpl check = encodePathVersion("/foo", 100, enumToSignedInt(OpCodes::Check), true);
Buffer::OwnedImpl set = encodeSetRequest("/bar", "2", -1, true);
Buffer::OwnedImpl delete1 = encodeDeleteRequest("/abcd", 1, true);
Buffer::OwnedImpl delete2 = encodeDeleteRequest("/efg", 2, true);
Buffer::OwnedImpl getchildren =
encodePathWatch("/foo", false, 1000, enumToSignedInt(OpCodes::GetChildren), true);
Buffer::OwnedImpl getdata =
encodePathWatch("/bar", true, 1000, enumToSignedInt(OpCodes::GetData), true);

std::vector<std::pair<int32_t, Buffer::OwnedImpl>> ops;
ops.push_back(std::make_pair(enumToSignedInt(OpCodes::Create), std::move(create1)));
ops.push_back(std::make_pair(enumToSignedInt(OpCodes::Create), std::move(create2)));
ops.push_back(std::make_pair(enumToSignedInt(OpCodes::Create), std::move(create3)));
ops.push_back(std::make_pair(enumToSignedInt(OpCodes::Check), std::move(check1)));
ops.push_back(std::make_pair(enumToSignedInt(OpCodes::SetData), std::move(set1)));
ops.push_back(std::make_pair(enumToSignedInt(OpCodes::Create2), std::move(create2)));
ops.push_back(std::make_pair(enumToSignedInt(OpCodes::CreateContainer), std::move(create3)));
ops.push_back(std::make_pair(enumToSignedInt(OpCodes::CreateTtl), std::move(create4)));
ops.push_back(std::make_pair(enumToSignedInt(OpCodes::Check), std::move(check)));
ops.push_back(std::make_pair(enumToSignedInt(OpCodes::SetData), std::move(set)));
ops.push_back(std::make_pair(enumToSignedInt(OpCodes::Delete), std::move(delete1)));
ops.push_back(std::make_pair(enumToSignedInt(OpCodes::Delete), std::move(delete2)));
ops.push_back(std::make_pair(enumToSignedInt(OpCodes::GetChildren), std::move(getchildren)));
ops.push_back(std::make_pair(enumToSignedInt(OpCodes::GetData), std::move(getdata)));

Buffer::OwnedImpl data = encodeMultiRequest(ops);

EXPECT_EQ(Envoy::Network::FilterStatus::Continue, filter_->onData(data, false));
EXPECT_EQ(1UL, config_->stats().multi_rq_.value());
EXPECT_EQ(200UL, config_->stats().request_bytes_.value());
EXPECT_EQ(200UL, config_->stats().multi_rq_bytes_.value());
EXPECT_EQ(3UL, config_->stats().create_rq_.value());
EXPECT_EQ(1UL, config_->stats().setdata_rq_.value());
EXPECT_EQ(266UL, config_->stats().request_bytes_.value());
EXPECT_EQ(266UL, config_->stats().multi_rq_bytes_.value());
EXPECT_EQ(1UL, config_->stats().create_rq_.value());
EXPECT_EQ(1UL, config_->stats().create2_rq_.value());
EXPECT_EQ(1UL, config_->stats().createcontainer_rq_.value());
EXPECT_EQ(1UL, config_->stats().createttl_rq_.value());
EXPECT_EQ(1UL, config_->stats().check_rq_.value());
EXPECT_EQ(1UL, config_->stats().setdata_rq_.value());
EXPECT_EQ(2UL, config_->stats().delete_rq_.value());
EXPECT_EQ(1UL, config_->stats().getchildren_rq_.value());
EXPECT_EQ(1UL, config_->stats().getdata_rq_.value());
EXPECT_EQ(0UL, config_->stats().decoder_error_.value());
EXPECT_EQ(0UL, config_->stats().multi_decoder_error_.value());

Expand Down

0 comments on commit 44d4d82

Please sign in to comment.