From 87d4d62dfcac8928c63420f505b11413248ca681 Mon Sep 17 00:00:00 2001 From: kingluo Date: Tue, 6 Jun 2023 14:14:09 +0800 Subject: [PATCH 1/4] fix: save and restore pb state --- apisix/core/pubsub.lua | 3 +- apisix/plugins/grpc-transcode/proto.lua | 4 +- apisix/plugins/grpc-transcode/request.lua | 2 + apisix/plugins/grpc-transcode/response.lua | 2 + apisix/plugins/grpc-transcode/util.lua | 2 - t/plugin/opentelemetry-bugfix-pb-state.t | 192 +++++++++++++++++++++ 6 files changed, 200 insertions(+), 5 deletions(-) create mode 100644 t/plugin/opentelemetry-bugfix-pb-state.t diff --git a/apisix/core/pubsub.lua b/apisix/core/pubsub.lua index 25ac46f13eeb..18bb887001c1 100644 --- a/apisix/core/pubsub.lua +++ b/apisix/core/pubsub.lua @@ -185,9 +185,10 @@ function _M.wait(self) end -- recovery of stored pb_store - pb.state(pb_state) + local pb_old_state = pb.state(pb_state) local data, err = pb.decode("PubSubReq", raw_data) + pb.state(pb_old_state) if not data then log.error("pubsub server receives undecodable data, err: ", err) send_error(ws, 0, "wrong command") diff --git a/apisix/plugins/grpc-transcode/proto.lua b/apisix/plugins/grpc-transcode/proto.lua index 1c9b7718fdc1..347ec39eae17 100644 --- a/apisix/plugins/grpc-transcode/proto.lua +++ b/apisix/plugins/grpc-transcode/proto.lua @@ -99,7 +99,7 @@ end local function compile_proto(content) -- clear pb state - pb.state(nil) + local old_pb_state = pb.state(nil) local compiled, err = compile_proto_text(content) if not compiled then @@ -110,7 +110,7 @@ local function compile_proto(content) end -- fetch pb state - compiled.pb_state = pb.state(nil) + compiled.pb_state = pb.state(old_pb_state) return compiled end diff --git a/apisix/plugins/grpc-transcode/request.lua b/apisix/plugins/grpc-transcode/request.lua index 88d2fcfb9ec8..d98c2fc7a59e 100644 --- a/apisix/plugins/grpc-transcode/request.lua +++ b/apisix/plugins/grpc-transcode/request.lua @@ -39,7 +39,9 @@ return function (proto, service, method, pb_option, deadline, default_values) util.set_options(proto, pb_option) local map_message = util.map_message(m.input_type, default_values or {}) + local pb_old_state = pb.state(proto.pb_state) local ok, encoded = pcall(pb.encode, m.input_type, map_message) + pb.state(pb_old_state) if not ok or not encoded then return false, "failed to encode request data to protobuf", 400 diff --git a/apisix/plugins/grpc-transcode/response.lua b/apisix/plugins/grpc-transcode/response.lua index dee267b77c1f..db4e656c3b81 100644 --- a/apisix/plugins/grpc-transcode/response.lua +++ b/apisix/plugins/grpc-transcode/response.lua @@ -121,7 +121,9 @@ return function(ctx, proto, service, method, pb_option, show_status_in_body, sta util.set_options(proto, pb_option) local err_msg + local pb_old_state = pb.state(proto.pb_state) local decoded = pb.decode(m.output_type, buffer) + pb.state(pb_old_state) if not decoded then err_msg = "failed to decode response data by protobuf" ngx.arg[1] = err_msg diff --git a/apisix/plugins/grpc-transcode/util.lua b/apisix/plugins/grpc-transcode/util.lua index 4a27c1d3b9df..a95cb8202041 100644 --- a/apisix/plugins/grpc-transcode/util.lua +++ b/apisix/plugins/grpc-transcode/util.lua @@ -48,8 +48,6 @@ function _M.find_method(proto, service, method) return nil end - -- restore pb state - pb.state(proto.pb_state) return res end diff --git a/t/plugin/opentelemetry-bugfix-pb-state.t b/t/plugin/opentelemetry-bugfix-pb-state.t new file mode 100644 index 000000000000..02e2d1b96bb4 --- /dev/null +++ b/t/plugin/opentelemetry-bugfix-pb-state.t @@ -0,0 +1,192 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +use t::APISIX 'no_plan'; + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->extra_yaml_config) { + my $extra_yaml_config = <<_EOC_; +plugins: + - example-plugin + - key-auth + - opentelemetry +plugin_attr: + opentelemetry: + batch_span_processor: + max_export_batch_size: 1 + inactive_timeout: 0.5 +_EOC_ + $block->set_value("extra_yaml_config", $extra_yaml_config); + } + + + if (!$block->extra_init_by_lua) { + my $extra_init_by_lua = <<_EOC_; +-- mock exporter http client +local client = require("opentelemetry.trace.exporter.http_client") +client.do_request = function() + ngx.log(ngx.INFO, "opentelemetry export span") + return "ok" +end +local ctx_new = require("opentelemetry.context").new +require("opentelemetry.context").new = function (...) + local ctx = ctx_new(...) + local current = ctx.current + ctx.current = function (...) + ngx.log(ngx.INFO, "opentelemetry context current") + return current(...) + end + return ctx +end +_EOC_ + + $block->set_value("extra_init_by_lua", $extra_init_by_lua); + } + + if (!$block->request) { + $block->set_value("request", "GET /t"); + } + + $block; +}); + +run_tests; + +__DATA__ + +=== TEST 3: set additional_attributes with match +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/routes/1', + ngx.HTTP_PUT, + [[{ + "name": "route_name", + "plugins": { + "opentelemetry": { + "sampler": { + "name": "always_on" + }, + "additional_header_prefix_attributes": [ + "x-my-header-*" + ] + } + }, + "upstream": { + "nodes": { + "127.0.0.1:1980": 1 + }, + "type": "roundrobin" + }, + "uri": "/hello" + }]] + ) + + if code >= 300 then + ngx.status = code + end + ngx.say(body) + } + } +--- response_body +passed + + + +=== TEST 4: opentelemetry expands headers +--- extra_init_by_lua + local otlp = require("opentelemetry.trace.exporter.otlp") + local orig_export_spans = otlp.export_spans + otlp.export_spans = function(self, spans) + if (#spans ~= 1) then + ngx.log(ngx.ERR, "unexpected spans length: ", #spans) + return + end + + local attributes_names = {} + local attributes = {} + local span = spans[1] + for _, attribute in ipairs(span.attributes) do + if attribute.key == "hostname" then + -- remove any randomness + goto skip + end + table.insert(attributes_names, attribute.key) + attributes[attribute.key] = attribute.value.string_value or "" + ::skip:: + end + table.sort(attributes_names) + for _, attribute in ipairs(attributes_names) do + ngx.log(ngx.INFO, "attribute " .. attribute .. ": \"" .. attributes[attribute] .. "\"") + end + + ngx.log(ngx.INFO, "opentelemetry export span") + return orig_export_spans(self, spans) + end +--- config + location /t { + content_by_lua_block { + local t = require("lib.test_admin").test + local code, body = t('/apisix/admin/protos/1', + ngx.HTTP_PUT, + [[{ + "content" : "syntax = \"proto3\"; + package helloworld; + service Greeter { + rpc SayHello (HelloRequest) returns (HelloReply) {} + } + message HelloRequest { + string name = 1; + } + message HelloReply { + string message = 1; + }" + }]] + ) + + if code >= 300 then + ngx.status = code + end + local http = require "resty.http" + local httpc = http.new() + local uri1 = "http://127.0.0.1:" .. ngx.var.server_port .. "/hello" + local headers = { + ["x-my-header-name"] = "william", + ["x-my-header-nick"] = "bill", + } + local res, err = httpc:request_uri(uri1, {method = "GET", headers = headers}) + if not res then + ngx.say(err) + return + end + ngx.status = res.status + } + } +--- wait: 1 +--- error_code: 200 +--- no_error_log +type 'opentelemetry.proto.trace.v1.TracesData' does not exists +--- grep_error_log eval +qr/attribute .+?:.[^,]*/ +--- grep_error_log_out +attribute route: "route_name" +attribute service: "" +attribute x-my-header-name: "william" +attribute x-my-header-nick: "bill" From 914b99372cbd46bf160e373f37946b1117fe6c82 Mon Sep 17 00:00:00 2001 From: kingluo Date: Tue, 6 Jun 2023 14:17:11 +0800 Subject: [PATCH 2/4] fix PR --- t/plugin/opentelemetry-bugfix-pb-state.t | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/t/plugin/opentelemetry-bugfix-pb-state.t b/t/plugin/opentelemetry-bugfix-pb-state.t index 02e2d1b96bb4..b6f2e1052e24 100644 --- a/t/plugin/opentelemetry-bugfix-pb-state.t +++ b/t/plugin/opentelemetry-bugfix-pb-state.t @@ -70,7 +70,7 @@ run_tests; __DATA__ -=== TEST 3: set additional_attributes with match +=== TEST 1: set additional_attributes with match --- config location /t { content_by_lua_block { @@ -110,7 +110,7 @@ passed -=== TEST 4: opentelemetry expands headers +=== TEST 2: opentelemetry expands headers --- extra_init_by_lua local otlp = require("opentelemetry.trace.exporter.otlp") local orig_export_spans = otlp.export_spans From 0aa7c91ab9315f56439be160cecb36a67a401c50 Mon Sep 17 00:00:00 2001 From: kingluo Date: Tue, 6 Jun 2023 15:19:48 +0800 Subject: [PATCH 3/4] fix PR --- apisix/plugins/grpc-transcode/request.lua | 2 +- apisix/plugins/grpc-transcode/response.lua | 7 +++++-- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/apisix/plugins/grpc-transcode/request.lua b/apisix/plugins/grpc-transcode/request.lua index d98c2fc7a59e..934a1c95657c 100644 --- a/apisix/plugins/grpc-transcode/request.lua +++ b/apisix/plugins/grpc-transcode/request.lua @@ -36,10 +36,10 @@ return function (proto, service, method, pb_option, deadline, default_values) req_read_body() + local pb_old_state = pb.state(proto.pb_state) util.set_options(proto, pb_option) local map_message = util.map_message(m.input_type, default_values or {}) - local pb_old_state = pb.state(proto.pb_state) local ok, encoded = pcall(pb.encode, m.input_type, map_message) pb.state(pb_old_state) diff --git a/apisix/plugins/grpc-transcode/response.lua b/apisix/plugins/grpc-transcode/response.lua index db4e656c3b81..3f1b7399dad0 100644 --- a/apisix/plugins/grpc-transcode/response.lua +++ b/apisix/plugins/grpc-transcode/response.lua @@ -99,7 +99,10 @@ return function(ctx, proto, service, method, pb_option, show_status_in_body, sta -- handle error response after the last response chunk if ngx.status >= 300 and show_status_in_body then - return handle_error_response(status_detail_type) + local pb_old_state = pb.state(proto.pb_state) + local ret = handle_error_response(status_detail_type) + pb.state(pb_old_state) + return ret end -- when body has already been read by other plugin @@ -118,10 +121,10 @@ return function(ctx, proto, service, method, pb_option, show_status_in_body, sta buffer = string.sub(buffer, 6) end + local pb_old_state = pb.state(proto.pb_state) util.set_options(proto, pb_option) local err_msg - local pb_old_state = pb.state(proto.pb_state) local decoded = pb.decode(m.output_type, buffer) pb.state(pb_old_state) if not decoded then From 0ed3b4524b0e7886c8e80d9beb5de53e57c17b53 Mon Sep 17 00:00:00 2001 From: kingluo Date: Wed, 7 Jun 2023 13:43:57 +0800 Subject: [PATCH 4/4] fix PR --- apisix/plugins/grpc-transcode/response.lua | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/apisix/plugins/grpc-transcode/response.lua b/apisix/plugins/grpc-transcode/response.lua index 3f1b7399dad0..9dd6780f049d 100644 --- a/apisix/plugins/grpc-transcode/response.lua +++ b/apisix/plugins/grpc-transcode/response.lua @@ -25,7 +25,7 @@ local ipairs = ipairs local pcall = pcall -local function handle_error_response(status_detail_type) +local function handle_error_response(status_detail_type, proto) local err_msg local grpc_status = ngx.header["grpc-status-details-bin"] @@ -58,7 +58,9 @@ local function handle_error_response(status_detail_type) if status_detail_type and details then local decoded_details = {} for _, detail in ipairs(details) do + local pb_old_state = pb.state(proto.pb_state) local ok, err_or_value = pcall(pb.decode, status_detail_type, detail.value) + pb.state(pb_old_state) if not ok then err_msg = "failed to call pb.decode to decode details in " .. "grpc-status-details-bin" @@ -99,10 +101,7 @@ return function(ctx, proto, service, method, pb_option, show_status_in_body, sta -- handle error response after the last response chunk if ngx.status >= 300 and show_status_in_body then - local pb_old_state = pb.state(proto.pb_state) - local ret = handle_error_response(status_detail_type) - pb.state(pb_old_state) - return ret + return handle_error_response(status_detail_type, proto) end -- when body has already been read by other plugin