Skip to content

Commit

Permalink
feat(APIAnalytics) cosocket http and full ALF
Browse files Browse the repository at this point in the history
  • Loading branch information
thibaultcha committed Jun 2, 2015
1 parent ddd82ea commit 786d538
Show file tree
Hide file tree
Showing 11 changed files with 150 additions and 88 deletions.
7 changes: 3 additions & 4 deletions kong/kong.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ local cache = require "kong.tools.database_cache"
local stringy = require "stringy"
local constants = require "kong.constants"
local responses = require "kong.tools.responses"
local timestamp = require "kong.tools.timestamp"

-- Define the plugins to load here, in the appropriate order
local plugins = {}
Expand Down Expand Up @@ -170,7 +169,7 @@ end
-- Calls plugins_access() on every loaded plugin
function _M.exec_plugins_access()
-- Setting a property that will be available for every plugin
ngx.ctx.started_at = timestamp.get_utc()
ngx.ctx.started_at = ngx.req.start_time()
ngx.ctx.plugin_conf = {}

-- Iterate over all the plugins
Expand Down Expand Up @@ -200,12 +199,12 @@ function _M.exec_plugins_access()
end
ngx.var.backend_url = final_url

ngx.ctx.proxy_started_at = timestamp.get_utc() -- Setting a property that will be available for every plugin
ngx.ctx.proxy_started_at = ngx.now() -- Setting a property that will be available for every plugin
end

-- Calls header_filter() on every loaded plugin
function _M.exec_plugins_header_filter()
ngx.ctx.proxy_ended_at = timestamp.get_utc() -- Setting a property that will be available for every plugin
ngx.ctx.proxy_ended_at = ngx.now() -- Setting a property that will be available for every plugin

if not ngx.ctx.stop_phases then
ngx.header["Via"] = constants.NAME.."/"..constants.VERSION
Expand Down
90 changes: 63 additions & 27 deletions kong/plugins/apianalytics/handler.lua
Original file line number Diff line number Diff line change
@@ -1,12 +1,51 @@
local http = require "socket.http"
local ltn12 = require "ltn12"
local http = require "resty_http"
local BasePlugin = require "kong.plugins.base_plugin"
local ALFSerializer = require "kong.plugins.log_serializers.alf"

local http_client = require "kong.tools.http_client"
local APIANALYTICS_SOCKET = {
host = "localhost",
port = 58000,
path = "/alf_1.0.0"
}

--local SERVER_URL = "http://socket.apianalytics.com/"
local SERVER_URL = "http://localhost:58000/alf_1.0.0"
local function send_batch(premature, message)
local client = http:new()
client:set_timeout(1000) -- 1 sec

local ok, err = client:connect(APIANALYTICS_SOCKET.host, APIANALYTICS_SOCKET.port)
if not ok then
ngx.log(ngx.ERR, "[apianalytics] failed to connect to the socket: "..err)
return
end

local res, err = client:request({ path = APIANALYTICS_SOCKET.path, body = message })
if not res then
ngx.log(ngx.ERR, "[apianalytics] failed to send batch: "..err)
return
end

-- close connection, or put it into the connection pool
if res.headers["connection"] == "close" then
local ok, err = client:close()
if not ok then
ngx.log(ngx.ERR, "[apianalytics] failed to close: "..err)
return
end
else
client:set_keepalive()
end

if res.status == 200 then
ALFSerializer:flush_entries()
ngx.log(ngx.DEBUG, "[apianalytics] successfully saved the batch")
else
ngx.log(ngx.ERR, "[apianalytics] socket refused the batch: "..res.body)
end
end

--
--
--

local APIAnalyticsHandler = BasePlugin:extend()

Expand All @@ -18,41 +57,38 @@ function APIAnalyticsHandler:access(conf)
APIAnalyticsHandler.super.access(self)

ngx.req.read_body()
ngx.ctx.req_body = ngx.req.get_body_data()
ngx.ctx.res_body = ""
ngx.ctx.apianalytics = {
req_body = ngx.req.get_body_data(),
res_body = ""
}
end

function APIAnalyticsHandler:body_filter(conf)
APIAnalyticsHandler.super.body_filter(self)

-- concatenate response chunks for response.content.text
local chunk = ngx.arg[1]
ngx.ctx.res_body = ngx.ctx.res_body..chunk
-- concatenate response chunks for ALF's `response.content.text`
local chunk, eof = ngx.arg[1], ngx.arg[2]
ngx.ctx.apianalytics.res_body = ngx.ctx.apianalytics.res_body..chunk

if eof then -- latest chunk
ngx.ctx.apianalytics.response_received = ngx.now()
end
end

function APIAnalyticsHandler:log(conf)
APIAnalyticsHandler.super.log(self)

ALFSerializer:add_entry(ngx)

-- if queue is full
local message = ALFSerializer:to_json_string("54d2b98ee0d5076065fd6f93")
print("MESSAGE: "..message)
local entries_size = ALFSerializer:add_entry(ngx)

-- TODO: use the cosocket API
local response, status, headers = http_client.post(SERVER_URL, message,
{
["content-length"] = string.len(message),
["content-type"] = "application/json"
})
if entries_size > 2 then
local message = ALFSerializer:to_json_string("54d2b98ee0d5076065fd6f93")
print("MESSAGE: "..message)

print("STATUS: "..status)
if status ~= 200 then
ngx.log(ngx.ERR, "Could not send entry to "..SERVER_URL)
print("RESPONSE IS: "..response)
local ok, err = ngx.timer.at(0, send_batch, message)
if not ok then
ngx.log(ngx.ERR, "[apianalytics] failed to create timer: ", err)
end
end

-- todo: flush
end

return APIAnalyticsHandler
5 changes: 4 additions & 1 deletion kong/plugins/apianalytics/schema.lua
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
return {}
return {
serviceToken = { type = "string", required = true },
log_body = { type = "boolean", default = true }
}
8 changes: 3 additions & 5 deletions kong/plugins/filelog/handler.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
-- Copyright (C) Mashape, Inc.

local basic_serializer = require "kong.plugins.log_serializers.basic"
local json = require "cjson"
local BasePlugin = require "kong.plugins.base_plugin"
local cjson = require "cjson"
local basic_serializer = require "kong.plugins.log_serializers.basic"

local FileLogHandler = BasePlugin:extend()

Expand All @@ -14,7 +12,7 @@ function FileLogHandler:log(conf)
FileLogHandler.super.log(self)

local message = basic_serializer.serialize(ngx)
ngx.log(ngx.INFO, cjson.encode(message))
ngx.log(ngx.INFO, json.encode(message))
end

return FileLogHandler
5 changes: 4 additions & 1 deletion kong/plugins/httplog/handler.lua
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
local basic_serializer = require "kong.plugins.log_serializers.basic"
local BasePlugin = require "kong.plugins.base_plugin"
local log = require "kong.plugins.httplog.log"

Expand All @@ -9,7 +10,9 @@ end

function HttpLogHandler:log(conf)
HttpLogHandler.super.log(self)
log.execute(conf)

local message = basic_serializer.serialize(ngx)
log.execute(conf, message)
end

return HttpLogHandler
107 changes: 67 additions & 40 deletions kong/plugins/log_serializers/alf.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,16 @@
-- - Nginx lua module documentation: http://wiki.nginx.org/HttpLuaModule
-- - ngx_http_core_module: http://wiki.nginx.org/HttpCoreModule#.24http_HEADER

local json = require "cjson"

local EMPTY_ARRAY_PLACEHOLDER = "__empty_array_placeholder__"

local alf_mt = {}
alf_mt.__index = alf_mt

local ALF = {
version = "1.0.0",
serviceToken = "",
serviceToken = "", -- will be filled by to_json_string()
har = {
log = {
version = "1.2",
Expand All @@ -25,16 +27,26 @@ local ALF = {
}
}

-- Transform a key/value lua table into an array of elements with `name`, `value`
-- Transform a key/value lua table into an array of elements with `name`, `value`.
-- Since Lua won't recognize {} as an empty array but an empty object, we need to force it
-- to be an array, hence we will do "[__empty_array_placeholder__]".
-- Then once the ALF will be stringified, we will remove the placeholder so the only left element will be "[]".
-- @param hash key/value dictionary to serialize
-- @return an array, or nil
local function dic_to_array(hash)
-- @param `hash` key/value dictionary to serialize.
-- @param `fn` Some function to execute at each key iteration, with the key and value as parameters.
-- @return `array` an array, or nil
local function dic_to_array(hash, fn)
if not fn then fn = function() end end
local arr = {}
for k, v in pairs(hash) do
table.insert(arr, { name = k, value = v })
-- If the key has multiple values, v will be an array of all those values for the same key
-- hence we have to add multiple entries to the output array for that same key.
if type(v) ~= "table" then
v = {v}
end
for _, val in ipairs(v) do
table.insert(arr, { name = k, value = val })
fn(k, val)
end
end

if #arr > 0 then
Expand All @@ -44,88 +56,103 @@ local function dic_to_array(hash)
end
end

-- Serialize into one ALF entry
-- For performance reasons, it tries to use the NGINX Lua API
-- instead of ngx_http_core_module when possible.
-- Round a number to the third decimal.
-- http://lua-users.org/wiki/SimpleRound
local function round(num)
return math.floor(num * 10^3 + 0.5) / 10^3
end

-- Serialize `ngx` into one ALF entry.
-- For performance reasons, it tries to use the NGINX Lua API instead of
-- ngx_http_core_module when possible.
function alf_mt:serialize_entry(ngx)
-- Extracted data
local req_headers = ngx.req.get_headers()
local res_headers = ngx.resp.get_headers()

local req_body = ngx.ctx.req_body
local res_body = ngx.ctx.res_body

-- ALF format
local apianalytics_data = ngx.ctx.apianalytics
local req_body = apianalytics_data.req_body
local res_body = apianalytics_data.res_body

local started_at = ngx.ctx.started_at

-- ALF properties
-- timers
local send_time = round(ngx.ctx.proxy_started_at - started_at)
local wait_time = round(ngx.ctx.proxy_ended_at - ngx.ctx.proxy_started_at)
local receive_time = round(apianalytics_data.response_received - ngx.ctx.proxy_ended_at)
-- headers and headers size
local req_headers_str, res_headers_str= "", ""
local req_headers_arr = dic_to_array(req_headers, function(k, v) req_headers_str = req_headers_str..k..v end)
local res_headers_arr = dic_to_array(res_headers, function(k, v) res_headers_str = res_headers_str..k..v end)
local req_headers_size = string.len(req_headers_str)
local res_headers_size = string.len(res_headers_str)
-- values extracted from headers
local alf_req_mimeType = req_headers["Content-Type"] and req_headers["Content-Type"] or "application/octet-stream"
local alf_res_mimeType = res_headers["Content-Type"] and res_headers["Content-Type"] or "application/octet-stream"
local alf_req_bodySize = req_headers["Content-Length"] and req_headers["Content-Length"] or 0
local alf_res_bodySize = res_headers["Content-Length"] and res_headers["Content-Length"] or 0
local alf_req_bodySize = req_headers["Content-Length"] and req_headers["Content-Length"] or -1

return {
startedDateTime = os.date("!%Y-%m-%dT%TZ", ngx.req.start_time()),
startedDateTime = os.date("!%Y-%m-%dT%TZ", started_at),
clientIPAddress = ngx.var.remote_addr,
time = 3,
-- REQUEST
time = send_time + wait_time + receive_time,
request = {
method = ngx.req.get_method(),
url = ngx.var.scheme.."://"..ngx.var.host..ngx.var.uri,
httpVersion = "HTTP/"..ngx.req.http_version(),
queryString = dic_to_array(ngx.req.get_uri_args()),
headers = dic_to_array(req_headers),
headersSize = 10,
headers = req_headers_arr,
headersSize = req_headers_size,
cookies = {EMPTY_ARRAY_PLACEHOLDER},
bodySize = tonumber(alf_req_bodySize),
content = {
size = tonumber(ngx.var.request_length),
postData = {
mimeType = alf_req_mimeType,
params = dic_to_array(ngx.req.get_post_args()),
text = req_body and req_body or ""
}
},
-- RESPONSE
response = {
status = ngx.status,
statusText = "",
httpVersion = "",
headers = dic_to_array(res_headers),
headersSize = 10,
statusText = "", -- can't find a way to retrieve that
httpVersion = "", -- can't find a way to retrieve that either
headers = res_headers_arr,
headersSize = res_headers_size,
cookies = {EMPTY_ARRAY_PLACEHOLDER},
bodySize = tonumber(alf_res_bodySize),
bodySize = tonumber(ngx.var.body_bytes_sent),
redirectURL = "",
content = {
size = tonumber(ngx.var.bytes_sent),
size = tonumber(ngx.var.body_bytes_sent),
mimeType = alf_res_mimeType,
text = res_body and res_body or ""
}
},
cache = {},
-- TIMINGS
timings = {
send = 1,
wait = 1,
receive = 1,
blocked = 0,
connect = 0,
dns = 0,
ssl = 0
send = send_time,
wait = wait_time,
receive = receive_time,
blocked = -1,
connect = -1,
dns = -1,
ssl = -1
}
} -- end of entry
end

function alf_mt:add_entry(ngx)
table.insert(self.har.log.entries, self:serialize_entry(ngx))
return table.getn(self.har.log.entries)
end

function alf_mt:to_json_string(token)
if not token then
error("API Analytics serviceToken required", 2)
end

local cjson = require "cjson"

-- inject token
self.serviceToken = token

local str = cjson.encode(self)
local str = json.encode(self)
return str:gsub("\""..EMPTY_ARRAY_PLACEHOLDER.."\"", ""):gsub("\\/", "/")
end

Expand Down
1 change: 1 addition & 0 deletions kong/plugins/log_serializers/basic.lua
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ local _M = {}

function _M.serialize(ngx)
return {
request = {
uri = ngx.var.request_uri,
request_uri = ngx.var.scheme.."://"..ngx.var.host..":"..ngx.var.server_port..ngx.var.request_uri,
querystring = ngx.req.get_uri_args(), -- parameters, as a table
Expand Down
4 changes: 1 addition & 3 deletions kong/plugins/udplog/handler.lua
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
-- Copyright (C) Mashape, Inc.

local basic_serializer = require "kong.plugins.log_serializers.basic"
local log = require "kong.plugins.udplog.log"
local BasePlugin = require "kong.plugins.base_plugin"
local basic_serializer = require "kong.plugins.log_serializers.basic"

local UdpLogHandler = BasePlugin:extend()

Expand Down
Loading

0 comments on commit 786d538

Please sign in to comment.