diff --git a/.ignore_words b/.ignore_words index 33b02228efde..5e93545370e1 100644 --- a/.ignore_words +++ b/.ignore_words @@ -6,3 +6,4 @@ shttp nd hel nulll +smove diff --git a/apisix/stream/xrpc/protocols/redis/commands.lua b/apisix/stream/xrpc/protocols/redis/commands.lua new file mode 100644 index 000000000000..ff3338fdef9a --- /dev/null +++ b/apisix/stream/xrpc/protocols/redis/commands.lua @@ -0,0 +1,222 @@ +-- +-- Licensed to the Apache Software Foundation (ASF) under one or more +-- contributor license agreements. See the NOTICE file distributed with +-- this work for additional information regarding copyright ownership. +-- The ASF licenses this file to You under the Apache License, Version 2.0 +-- (the "License"); you may not use this file except in compliance with +-- the License. You may obtain a copy of the License at +-- +-- http://www.apache.org/licenses/LICENSE-2.0 +-- +-- Unless required by applicable law or agreed to in writing, software +-- distributed under the License is distributed on an "AS IS" BASIS, +-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +-- See the License for the specific language governing permissions and +-- limitations under the License. +-- +local ipairs = ipairs +local pairs = pairs + + +local cmd_to_key_finder = {} +--[[ +-- the data is generated from the script below +local redis = require "resty.redis" +local red = redis:new() + +local ok, err = red:connect("127.0.0.1", 6379) +if not ok then + ngx.say("failed to connect: ", err) + return +end + +local res = red:command("info") +local map = {} +for _, r in ipairs(res) do + local first_key = r[4] + local last_key = r[5] + local step = r[6] + local idx = first_key .. ':' .. last_key .. ':' .. step + + if idx ~= "1:1:1" then + -- "1:1:1" is the default + if map[idx] then + table.insert(map[idx], r[1]) + else + map[idx] = {r[1]} + end + end +end +for _, r in pairs(map) do + table.sort(r) +end +local dump = require('pl.pretty').dump; dump(map) +--]] +local key_to_cmd = { + ["0:0:0"] = { + "acl", + "asking", + "auth", + "bgrewriteaof", + "bgsave", + "blmpop", + "bzmpop", + "client", + "cluster", + "command", + "config", + "dbsize", + "debug", + "discard", + "echo", + "eval", + "eval_ro", + "evalsha", + "evalsha_ro", + "exec", + "failover", + "fcall", + "fcall_ro", + "flushall", + "flushdb", + "function", + "hello", + "info", + "keys", + "lastsave", + "latency", + "lmpop", + "lolwut", + "memory", + "module", + "monitor", + "multi", + "object", + "pfselftest", + "ping", + "psubscribe", + "psync", + "publish", + "pubsub", + "punsubscribe", + "quit", + "randomkey", + "readonly", + "readwrite", + "replconf", + "replicaof", + "reset", + "role", + "save", + "scan", + "script", + "select", + "shutdown", + "sintercard", + "slaveof", + "slowlog", + "subscribe", + "swapdb", + "sync", + "time", + "unsubscribe", + "unwatch", + "wait", + "xgroup", + "xinfo", + "xread", + "xreadgroup", + "zdiff", + "zinter", + "zintercard", + "zmpop", + "zunion" + }, + ["1:-1:1"] = { + "del", + "exists", + "mget", + "pfcount", + "pfmerge", + "sdiff", + "sdiffstore", + "sinter", + "sinterstore", + "ssubscribe", + "sunion", + "sunionstore", + "sunsubscribe", + "touch", + "unlink", + "watch" + }, + ["1:-1:2"] = { + "mset", + "msetnx" + }, + ["1:-2:1"] = { + "blpop", + "brpop", + "bzpopmax", + "bzpopmin" + }, + ["1:2:1"] = { + "blmove", + "brpoplpush", + "copy", + "geosearchstore", + "lcs", + "lmove", + "rename", + "renamenx", + "rpoplpush", + "smove", + "zrangestore" + }, + ["2:-1:1"] = { + "bitop" + }, + ["2:2:1"] = { + "pfdebug" + }, + ["3:3:1"] = { + "migrate" + } +} +local key_finders = { + ["0:0:0"] = false, + ["1:-1:1"] = function (idx, narg) + return 1 < idx + end, + ["1:-1:2"] = function (idx, narg) + return 1 < idx and idx % 2 == 0 + end, + ["1:-2:1"] = function (idx, narg) + return 1 < idx and idx < narg - 1 + end, + ["1:2:1"] = function (idx, narg) + return idx == 2 or idx == 3 + end, + ["2:-1:1"] = function (idx, narg) + return 2 < idx + end, + ["2:2:1"] = function (idx, narg) + return idx == 3 + end, + ["3:3:1"] = function (idx, narg) + return idx == 4 + end +} +for k, cmds in pairs(key_to_cmd) do + for _, cmd in ipairs(cmds) do + cmd_to_key_finder[cmd] = key_finders[k] + end +end + + +return { + cmd_to_key_finder = cmd_to_key_finder, + default_key_finder = function (idx, narg) + return idx == 2 + end, +} diff --git a/apisix/stream/xrpc/protocols/redis/init.lua b/apisix/stream/xrpc/protocols/redis/init.lua index ac31a4521e3a..2635cca713dd 100644 --- a/apisix/stream/xrpc/protocols/redis/init.lua +++ b/apisix/stream/xrpc/protocols/redis/init.lua @@ -16,6 +16,7 @@ -- local core = require("apisix.core") local sdk = require("apisix.stream.xrpc.sdk") +local commands = require("apisix.stream.xrpc.protocols.redis.commands") local xrpc_socket = require("resty.apisix.stream.xrpc.socket") local ffi = require("ffi") local ffi_str = ffi.string @@ -23,8 +24,10 @@ local math_random = math.random local OK = ngx.OK local DECLINED = ngx.DECLINED local DONE = ngx.DONE +local sleep = ngx.sleep local str_byte = string.byte local str_fmt = string.format +local ipairs = ipairs local tonumber = tonumber @@ -32,6 +35,7 @@ local tonumber = tonumber -- There is no plan to support inline command format local _M = {} local MAX_LINE_LEN = 128 +local MAX_VALUE_LEN = 128 local PREFIX_ARR = str_byte("*") local PREFIX_STR = str_byte("$") local PREFIX_STA = str_byte("+") @@ -39,7 +43,62 @@ local PREFIX_INT = str_byte(":") local PREFIX_ERR = str_byte("-") +local lrucache = core.lrucache.new({ + type = "plugin", +}) + + +local function create_matcher(conf) + local matcher = {} + --[[ + {"delay": 5, "key":"x", "commands":["GET", "MGET"]} + {"delay": 5, "commands":["GET"]} + => { + get = {keys = {x = {delay = 5}, * = {delay = 5}}} + mget = {keys = {x = {delay = 5}}} + } + ]]-- + for _, rule in ipairs(conf.faults) do + for _, cmd in ipairs(rule.commands) do + cmd = cmd:lower() + local key = rule.key + local kf = commands.cmd_to_key_finder[cmd] + local key_matcher = matcher[cmd] + if not key_matcher then + key_matcher = { + keys = {} + } + matcher[cmd] = key_matcher + end + + if not key or kf == false then + key = "*" + end + + if key_matcher.keys[key] then + core.log.warn("override existent fault rule of cmd: ", cmd, ", key: ", key) + end + + key_matcher.keys[key] = rule + end + end + + return matcher +end + + +local function get_matcher(conf, ctx) + return core.lrucache.plugin_ctx(lrucache, ctx, nil, create_matcher, conf) +end + + function _M.init_downstream(session) + local conf = session.route.protocol.conf + if conf and conf.faults then + local matcher = get_matcher(conf, session.conn_ctx) + session.matcher = matcher + end + session.req_id_seq = 0 session.resp_id_seq = 0 return xrpc_socket.downstream.socket() @@ -83,26 +142,53 @@ local function read_req(session, sk) local cmd_line = core.tablepool.fetch("xrpc_redis_cmd_line", narg, 0) - for i = 1, narg do + local n, err = read_len(sk) + if not n then + return nil, err + end + + local p, err = sk:read(n + 2) + if not p then + return nil, err + end + + local s = ffi_str(p, n) + cmd_line[1] = s + + local key_finder + local matcher = session.matcher + if matcher then + matcher = matcher[s:lower()] + if matcher then + key_finder = commands.cmd_to_key_finder[s] or commands.default_key_finder + end + end + + for i = 2, narg do + local is_key = false + if key_finder then + is_key = key_finder(i, narg) + end + local n, err = read_len(sk) if not n then return nil, err end local s - if n > 1024 then + if not is_key and n > MAX_VALUE_LEN then -- avoid recording big value - local p, err = sk:read(1024) + local p, err = sk:read(MAX_VALUE_LEN) if not p then return nil, err end - local ok, err = sk:drain(n - 1024 + 2) + local ok, err = sk:drain(n - MAX_VALUE_LEN + 2) if not ok then return nil, err end - s = ffi_str(p, 1024) .. "..." + s = ffi_str(p, MAX_VALUE_LEN) .. "...(" .. n .. " bytes)" else local p, err = sk:read(n + 2) if not p then @@ -110,6 +196,11 @@ local function read_req(session, sk) end s = ffi_str(p, n) + + if is_key and matcher.keys[s] then + matcher = matcher.keys[s] + key_finder = nil + end end cmd_line[i] = s @@ -121,6 +212,18 @@ local function read_req(session, sk) ctx.cmd = ctx.cmd_line[1] local pipelined = sk:has_pending_data() + + if matcher then + if matcher.keys then + -- try to match any key of this command + matcher = matcher.keys["*"] + end + + if matcher then + sleep(matcher.delay) + end + end + return true, nil, pipelined end diff --git a/apisix/stream/xrpc/protocols/redis/schema.lua b/apisix/stream/xrpc/protocols/redis/schema.lua index 49a7a0e484b2..0b6c90c65bc4 100644 --- a/apisix/stream/xrpc/protocols/redis/schema.lua +++ b/apisix/stream/xrpc/protocols/redis/schema.lua @@ -20,6 +20,31 @@ local core = require("apisix.core") local schema = { type = "object", properties = { + faults = { + type = "array", + minItems = 1, + items = { + type = "object", + properties = { + commands = { + type = "array", + minItems = 1, + items = { + type = "string" + }, + }, + key = { + type = "string", + minLength = 1, + }, + delay = { + type = "number", + description = "additional delay in seconds", + } + }, + required = {"commands", "delay"} + }, + }, }, } diff --git a/apisix/stream/xrpc/runner.lua b/apisix/stream/xrpc/runner.lua index 8de3176ffe57..ea807f60a181 100644 --- a/apisix/stream/xrpc/runner.lua +++ b/apisix/stream/xrpc/runner.lua @@ -28,9 +28,10 @@ local _M = {} local function open_session(conn_ctx) conn_ctx.xrpc_session = { - _upstream_conf = conn_ctx.matched_upstream, + conn_ctx = conn_ctx, + route = conn_ctx.matched_route.value, -- fields start with '_' should not be accessed by the protocol implementation - _route = conn_ctx.matched_route.value, + _upstream_conf = conn_ctx.matched_upstream, _ctxs = {}, } return conn_ctx.xrpc_session diff --git a/apisix/stream/xrpc/sdk.lua b/apisix/stream/xrpc/sdk.lua index 9773cb36f80b..3e3f4557ad03 100644 --- a/apisix/stream/xrpc/sdk.lua +++ b/apisix/stream/xrpc/sdk.lua @@ -125,8 +125,8 @@ end -- @treturn table the new router under the specific protocol -- @treturn string the new router version function _M.get_router(session, version) - local protocol_name = session._route.protocol.name - local id = session._route.id + local protocol_name = session.route.protocol.name + local id = session.route.id local items, conf_version = router.routes() if version == conf_version then diff --git a/t/xrpc/redis.t b/t/xrpc/redis.t index 52a5fa0f50e6..a3a9ec9ca4da 100644 --- a/t/xrpc/redis.t +++ b/t/xrpc/redis.t @@ -257,3 +257,311 @@ hget animals: bark } --- response_body --- stream_conf_enable + + + +=== TEST 6: delay +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local etcd = require("apisix.core.etcd") + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "redis", + conf = { + faults = { + {delay = 0.01, key = "ignored", commands = {"Ping", "time"}} + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:6379"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 7: hit +--- config + location /t { + content_by_lua_block { + local redis = require "resty.redis" + local red = redis:new() + + local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT) + if not ok then + ngx.say("failed to connect: ", err) + return + end + + local start = ngx.now() + local res, err = red:ping() + if not res then + ngx.say(err) + return + end + local now = ngx.now() + -- use integer to bypass float point number precision problem + if math.ceil((now - start) * 1000) < 10 then + ngx.say(now, " ", start) + return + end + start = now + + local res, err = red:time() + if not res then + ngx.say(err) + return + end + local now = ngx.now() + if math.ceil((now - start) * 1000) < 10 then + ngx.say(now, " ", start) + return + end + start = now + + red:init_pipeline() + red:time() + red:time() + red:get("A") + + local results, err = red:commit_pipeline() + if not results then + ngx.say("failed to commit: ", err) + return + end + local now = ngx.now() + if math.ceil((now - start) * 1000) < 20 or math.ceil((now - start) * 1000) > 30 then + ngx.say(now, " ", start) + return + end + + ngx.say("ok") + } + } +--- response_body +ok +--- stream_conf_enable + + + +=== TEST 8: DFS match +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local etcd = require("apisix.core.etcd") + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "redis", + conf = { + faults = { + {delay = 0.02, key = "a", commands = {"get"}}, + {delay = 0.01, commands = {"get", "set"}}, + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:6379"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 9: hit +--- config + location /t { + content_by_lua_block { + local redis = require "resty.redis" + local red = redis:new() + + local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT) + if not ok then + ngx.say("failed to connect: ", err) + return + end + + local start = ngx.now() + local res, err = red:get("a") + if not res then + ngx.say(err) + return + end + local now = ngx.now() + if math.ceil((now - start) * 1000) < 20 then + ngx.say(now, " ", start) + return + end + start = now + + local res, err = red:set("a", "a") + if not res then + ngx.say(err) + return + end + local now = ngx.now() + if math.ceil((now - start) * 1000) < 10 then + ngx.say(now, " ", start) + return + end + start = now + + red:init_pipeline() + red:get("b") + red:set("A", "a") + + local results, err = red:commit_pipeline() + if not results then + ngx.say("failed to commit: ", err) + return + end + local now = ngx.now() + if math.ceil((now - start) * 1000) < 20 or math.ceil((now - start) * 1000) > 30 then + ngx.say(now, " ", start) + return + end + + ngx.say("ok") + } + } +--- response_body +ok +--- stream_conf_enable + + + +=== TEST 10: multi keys +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local etcd = require("apisix.core.etcd") + local code, body = t('/apisix/admin/stream_routes/1', + ngx.HTTP_PUT, + { + protocol = { + name = "redis", + conf = { + faults = { + {delay = 0.03, key = "b", commands = {"del"}}, + {delay = 0.02, key = "a", commands = {"mset"}}, + {delay = 0.01, key = "b", commands = {"mset"}}, + } + } + }, + upstream = { + nodes = { + ["127.0.0.1:6379"] = 1 + }, + type = "roundrobin" + } + } + ) + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 11: hit +--- config + location /t { + content_by_lua_block { + local redis = require "resty.redis" + local red = redis:new() + + local ok, err = red:connect("127.0.0.1", $TEST_NGINX_REDIS_PORT) + if not ok then + ngx.say("failed to connect: ", err) + return + end + + local start = ngx.now() + local res, err = red:mset("c", 1, "a", 2, "b", 3) + if not res then + ngx.say(err) + return + end + local now = ngx.now() + if math.ceil((now - start) * 1000) < 20 then + ngx.say(now, " ", start) + return + end + start = now + + local res, err = red:mset("b", 2, "a", 3) + if not res then + ngx.say(err) + return + end + local now = ngx.now() + if math.ceil((now - start) * 1000) < 10 or math.ceil((now - start) * 1000) > 15 then + ngx.say(now, " ", start) + return + end + start = now + + local res, err = red:mset("c", "a") + if not res then + ngx.say(err) + return + end + local now = ngx.now() + if math.ceil((now - start) * 1000) > 5 then + ngx.say(now, " ", start) + return + end + start = now + + local res, err = red:del("a", "b") + if not res then + ngx.say(err) + return + end + local now = ngx.now() + if math.ceil((now - start) * 1000) < 30 then + ngx.say(now, " ", start) + return + end + start = now + + ngx.say("ok") + } + } +--- response_body +ok +--- stream_conf_enable