From fc9985570999ae8521834b19d7bf29d400048a55 Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Tue, 11 Aug 2015 21:27:13 -0700 Subject: [PATCH 1/2] hotfix(analytics) more robust buffer - Fix a critical issue where the buffer was not flushing itself when reaching the max size (in bytes). - Better logging in case of refused batch (number of items, size of the batch). - Dropping of the batch if it is refused by the socket server (HTTP 400). - If a batch is not accepted for another reason, it is now pushed to the **end** of the queue instead of keeping blocking it. - Dropping single ALF objects that exceed the max bytes size by themselves. - Retry timer now waits 2 seconds. - Tests for size limit reaching. --- kong/plugins/log_serializers/alf.lua | 2 +- kong/plugins/mashape-analytics/buffer.lua | 48 +++++++++++++------ .../mashape-analytics/alf_serializer_spec.lua | 2 +- .../plugins/mashape-analytics/buffer_spec.lua | 47 +++++++++++++++++- 4 files changed, 81 insertions(+), 18 deletions(-) diff --git a/kong/plugins/log_serializers/alf.lua b/kong/plugins/log_serializers/alf.lua index 7467658c131..887cfb8a99c 100644 --- a/kong/plugins/log_serializers/alf.lua +++ b/kong/plugins/log_serializers/alf.lua @@ -182,7 +182,7 @@ function _M.new_alf(ngx, token, environment) version = "1.2", creator = { name = "mashape-analytics-agent-kong", - version = "1.0.1" + version = "1.0.2" }, entries = {_M.serialize_entry(ngx)} } diff --git a/kong/plugins/mashape-analytics/buffer.lua b/kong/plugins/mashape-analytics/buffer.lua index 1b486ea90eb..5230d2043e0 100644 --- a/kong/plugins/mashape-analytics/buffer.lua +++ b/kong/plugins/mashape-analytics/buffer.lua @@ -1,7 +1,7 @@ -- ALF buffer module -- -- This module contains a buffered array of ALF objects. When the buffer is full (max number of entries --- or max payload size), it is converted to a JSON payload and moved to another buffer of payloads to be +-- or max payload size), it is converted to a JSON payload and moved a queue of payloads to be -- sent to the server. -- -- 1 buffer of ALFs (gets flushed once it reached the mmax size) @@ -30,6 +30,7 @@ local ANALYTICS_SOCKET = { local buffer_mt = {} buffer_mt.__index = buffer_mt +buffer_mt.MAX_BUFFER_SIZE = MAX_BUFFER_SIZE -- A handler for delayed batch sending. When no call has been made for X seconds -- (X being conf.delay), we send the batch to keep analytics as close to real-time @@ -85,8 +86,15 @@ function buffer_mt:add_alf(alf) local next_n_entries = #self.entries + 1 local alf_size = string.len(str) + -- If the alf_size exceeds the payload limit by itself, we have a big problem + if alf_size > self.MAX_SIZE then + ngx.log(ngx.ERR, string.format("[mashape-analytics] ALF size exceeded the maximum size (%sMB) accepted by the socket server. Dropping it.", + self.MAX_SIZE / MB)) + return + end + -- If size or entries exceed the max limits - local full = next_n_entries > self.MAX_ENTRIES or self:get_size() > self.MAX_SIZE + local full = next_n_entries > self.MAX_ENTRIES or (self:get_size() + alf_size) > self.MAX_SIZE if full then self:flush() -- Batch size reached, let's send the data @@ -129,7 +137,11 @@ end -- 3. Empty the buffer and reset the current buffer size function buffer_mt:flush() local payload = self:payload_string() - table.insert(self.sending_queue, payload) + table.insert(self.sending_queue, { + payload = payload, + n_entries = #self.entries, + size = self:get_size() + }) self.entries = {} self.entries_size = 0 end @@ -145,23 +157,29 @@ function buffer_mt.send_batch(premature, self) return end - -- Let's send the oldest payload in our buffer - local message = self.sending_queue[1] + -- Let's send the oldest batch in our queue + local batch_to_send = table.remove(self.sending_queue, 1) - local batch_saved = false + local drop_batch = false local client = http:new() client:set_timeout(50000) -- 5 sec local ok, err = client:connect(ANALYTICS_SOCKET.host, ANALYTICS_SOCKET.port) if ok then - local res, err = client:request({path = ANALYTICS_SOCKET.path, body = message}) + local res, err = client:request({path = ANALYTICS_SOCKET.path, body = batch_to_send.payload}) if not res then - ngx.log(ngx.ERR, "[mashape-analytics] failed to send batch: "..err) + ngx.log(ngx.ERR, string.format("[mashape-analytics] failed to send batch (%s ALFs %s bytes): %s", + batch_to_send.n_entries, batch_to_send.size, err)) elseif res.status == 200 then - batch_saved = true + drop_batch = true ngx.log(ngx.DEBUG, string.format("[mashape-analytics] successfully saved the batch. (%s)", res.body)) + elseif res.status == 400 then + ngx.log(ngx.ERR, string.format("[mashape-analytics] socket server refused the batch (%s ALFs %s bytes). Dropping batch. Status: (%s) Error: (%s)", + batch_to_send.n_entries, batch_to_send.size, res.status, res.body)) + drop_batch = true else - ngx.log(ngx.ERR, string.format("[mashape-analytics] socket server refused the batch. Status: (%s) Error: (%s)", res.status, res.body)) + ngx.log(ngx.ERR, string.format("[mashape-analytics] socket server could not save the batch (%s ALFs %s bytes). Status: (%s) Error: (%s)", + batch_to_send.n_entries, batch_to_send.size, res.status, res.body)) end -- close connection, or put it into the connection pool @@ -177,16 +195,16 @@ function buffer_mt.send_batch(premature, self) ngx.log(ngx.ERR, "[mashape-analytics] failed to connect to the socket server: "..err) end - if batch_saved then - -- Remove the payload that was sent - table.remove(self.sending_queue, 1) + if not drop_batch then + -- If the batch is not dropped, then add it back to the end of the queue and it will be tried again later + table.insert(self.sending_queue, batch_to_send) end self.lock_sending = false - -- Keep sendind data if the buffer is not yet emptied + -- Keep sendind data if the queue is not yet emptied if #self.sending_queue > 0 then - local ok, err = ngx.timer.at(0, self.send_batch, self) + local ok, err = ngx.timer.at(2, self.send_batch, self) if not ok then ngx.log(ngx.ERR, "[mashape-analytics] failed to create batch retry timer: ", err) end diff --git a/spec/plugins/mashape-analytics/alf_serializer_spec.lua b/spec/plugins/mashape-analytics/alf_serializer_spec.lua index a076cedaadd..5c3cebcd993 100644 --- a/spec/plugins/mashape-analytics/alf_serializer_spec.lua +++ b/spec/plugins/mashape-analytics/alf_serializer_spec.lua @@ -90,7 +90,7 @@ describe("ALF serializer", function() assert.equal("1.2", alf.har.log.version) assert.truthy(alf.har.log.creator) assert.equal("mashape-analytics-agent-kong", alf.har.log.creator.name) - assert.equal("1.0.1", alf.har.log.creator.version) + assert.equal("1.0.2", alf.har.log.creator.version) assert.truthy(alf.har.log.entries) assert.equal(1, #(alf.har.log.entries)) end) diff --git a/spec/plugins/mashape-analytics/buffer_spec.lua b/spec/plugins/mashape-analytics/buffer_spec.lua index 3d22150e5d2..9c34e73395a 100644 --- a/spec/plugins/mashape-analytics/buffer_spec.lua +++ b/spec/plugins/mashape-analytics/buffer_spec.lua @@ -64,11 +64,56 @@ describe("ALFBuffer", function() alf_buffer.flush:revert() end) end) + it("should call :flush() when reaching its max size", function() + -- How many stubs to reach the limit? + local COMMA_LEN = string.len(",") + local JSON_ARR_LEN = string.len("[]") + local max_n_stubs = math.ceil(ALFBuffer.MAX_BUFFER_SIZE / (STUB_LEN + COMMA_LEN)) -- + the comma after each ALF in the JSON payload + + -- Create a new buffer + local buffer = ALFBuffer.new({batch_size = max_n_stubs + 100, delay = 2}) + + local s = spy.on(buffer, "flush") + + -- Add max_n_stubs - 1 entries + for i = 1, max_n_stubs - 1 do + buffer:add_alf(ALF_STUB) + end + + assert.spy(s).was_not_called() + + -- We should have `(max_n_stubs - 1) * (STUB_LEN + COMMA_LEN) + JSON_ARR_LEN - COMMA_LEN` because no comma for latest object` + -- as our current buffer size. + assert.equal((max_n_stubs - 1) * (STUB_LEN + COMMA_LEN) + JSON_ARR_LEN - COMMA_LEN, buffer:get_size()) + + -- adding one more entry + buffer:add_alf(ALF_STUB) + assert.spy(s).was.called() + end) + it("should drop an ALF if it is too big by itself", function() + local str = string.rep(".", ALFBuffer.MAX_BUFFER_SIZE) + local huge_alf = {foo = str} + + local buffer = ALFBuffer.new(CONF_STUB) + + local s = spy.on(_G.ngx, "log") + + buffer:add_alf(huge_alf) + + assert.spy(s).was.called() + assert.equal(0, buffer.entries_size) + assert.equal(0, #buffer.entries) + + finally(function() + _G.ngx.log:revert() + end) + end) describe(":flush()", function() it("should have emptied the current buffer and added a payload to be sent", function() assert.equal(1, #alf_buffer.entries) assert.equal(1, #alf_buffer.sending_queue) - assert.equal("string", type(alf_buffer.sending_queue[1])) + assert.equal("table", type(alf_buffer.sending_queue[1])) + assert.equal("string", type(alf_buffer.sending_queue[1].payload)) assert.equal(STUB_LEN, alf_buffer.entries_size) end) end) From 20a351314bf161918d2535b96dc026c06f5e6d41 Mon Sep 17 00:00:00 2001 From: Thibault Charbonnier Date: Wed, 12 Aug 2015 17:11:23 -0700 Subject: [PATCH 2/2] fix(analytics) warn why request body can't be read --- kong/plugins/log_serializers/alf.lua | 3 ++- kong/plugins/mashape-analytics/handler.lua | 17 +++++++++++++++-- .../mashape-analytics/fixtures/requests.lua | 8 ++++---- 3 files changed, 21 insertions(+), 7 deletions(-) diff --git a/kong/plugins/log_serializers/alf.lua b/kong/plugins/log_serializers/alf.lua index 887cfb8a99c..f673d3a2eb1 100644 --- a/kong/plugins/log_serializers/alf.lua +++ b/kong/plugins/log_serializers/alf.lua @@ -65,6 +65,7 @@ function _M.serialize_entry(ngx) local alf_req_body = analytics_data.req_body or "" local alf_res_body = analytics_data.res_body or "" + local alf_req_post_args = analytics_data.req_post_args or {} -- timers local proxy_started_at, proxy_ended_at = ngx.ctx.proxy_started_at, ngx.ctx.proxy_ended_at @@ -128,7 +129,7 @@ function _M.serialize_entry(ngx) bodySize = string.len(alf_req_body), postData = { mimeType = alf_req_mimeType, - params = dic_to_array(ngx.req.get_post_args()), + params = dic_to_array(alf_req_post_args), text = alf_req_body } }, diff --git a/kong/plugins/mashape-analytics/handler.lua b/kong/plugins/mashape-analytics/handler.lua index 94c8e1df4ea..fb337f851dd 100644 --- a/kong/plugins/mashape-analytics/handler.lua +++ b/kong/plugins/mashape-analytics/handler.lua @@ -32,11 +32,24 @@ function AnalyticsHandler:access(conf) -- Retrieve and keep in memory the bodies for this request ngx.ctx.analytics = { req_body = "", - res_body = "" + res_body = "", + req_post_args = {} } + ngx.req.read_body() + + local status, res = pcall(ngx.req.get_post_args) + if not status then + if res == "requesty body in temp file not supported" then + ngx.log(ngx.ERR, "[mashape-analytics] cannot read request body from temporary file. Try increasing the client_body_buffer_size directive.") + else + ngx.log(ngx.ERR, res) + end + else + ngx.ctx.analytics.req_post_args = res + end + if conf.log_body then - ngx.req.read_body() ngx.ctx.analytics.req_body = ngx.req.get_body_data() end end diff --git a/spec/plugins/mashape-analytics/fixtures/requests.lua b/spec/plugins/mashape-analytics/fixtures/requests.lua index 4a5fa0c87da..9559a09a51b 100644 --- a/spec/plugins/mashape-analytics/fixtures/requests.lua +++ b/spec/plugins/mashape-analytics/fixtures/requests.lua @@ -8,8 +8,7 @@ return { get_method = function() return "GET" end, http_version = function() return 1.1 end, get_headers = function() return {["Accept"] = "/*/", ["Host"] = "mockbin.com"} end, - get_uri_args = function() return {["hello"] = "world", ["foo"] = "bar", ["number"] = 2} end, - get_post_args = function() return {["hello"] = {"world", "earth"}} end + get_uri_args = function() return {["hello"] = "world", ["foo"] = "bar", ["number"] = 2} end }, resp = { get_headers = function() return {["Connection"] = "close", ["Content-Type"] = "application/json", ["Content-Length"] = "934"} end @@ -30,6 +29,7 @@ return { analytics = { req_body = "hello=world&hello=earth", res_body = "{\"message\":\"response body\"}", + req_post_args = {["hello"] = {"world", "earth"}}, response_received = 143284457211 } } @@ -100,8 +100,7 @@ return { get_method = function() return "GET" end, http_version = function() return 1.1 end, get_headers = function() return {["Accept"] = "/*/", ["Host"] = "mockbin.com"} end, - get_uri_args = function() return {["hello"] = "world", ["foo"] = "bar"} end, - get_post_args = function() return {["hello"] = {"world", "earth"}} end + get_uri_args = function() return {["hello"] = "world", ["foo"] = "bar"} end }, resp = { get_headers = function() return {["Connection"] = "close", ["Content-Type"] = "application/json", ["Content-Length"] = "934"} end @@ -122,6 +121,7 @@ return { analytics = { req_body = "hello=world&hello=earth", res_body = "{\"message\":\"response body\"}", + req_post_args = {["hello"] = {"world", "earth"}}, response_received = 143284457211 } }