Skip to content

Commit

Permalink
Merge pull request #1125 from Mashape/fix/ttl
Browse files Browse the repository at this point in the history
Fixing TTL in Cassandra
  • Loading branch information
subnetmarco committed Apr 6, 2016
2 parents c08b22a + 1f8b9c1 commit b1bf822
Show file tree
Hide file tree
Showing 8 changed files with 85 additions and 13 deletions.
6 changes: 6 additions & 0 deletions kong.yml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@
## Key for encrypting network traffic within Kong. Must be a base64-encoded 16-byte key.
# encrypt: "foo"

######
## The TTL (time to live), in seconds, of a node in the cluster when it stops sending healthcheck pings, maybe
## because of a failure. If the node is not able to send a new healthcheck before the expiration, then new nodes
## in the cluster will stop attempting to connect to it on startup.
# ttl_on_failure: 3600

######
## Specify which database to use. Only "cassandra" is currently available.
# database: cassandra
Expand Down
2 changes: 1 addition & 1 deletion kong/cli/services/serf.lua
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ function Serf:_add_node()
local _, err = self._dao_factory.nodes:insert({
name = name,
cluster_listening_address = stringy.strip(addr)
}, {ttl = 3600})
}, {ttl = self._configuration.cluster.ttl_on_failure})
if err then
return false, err
end
Expand Down
2 changes: 1 addition & 1 deletion kong/core/cluster.lua
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ local function send_keepalive(premature)
ngx.log(ngx.ERR, tostring(err))
elseif #nodes == 1 then
local node = nodes[1]
local _, err = singletons.dao.nodes:update(node, node)
local _, err = singletons.dao.nodes:update(node, node, {ttl=singletons.configuration.cluster.ttl_on_failure})
if err then
ngx.log(ngx.ERR, tostring(err))
end
Expand Down
26 changes: 25 additions & 1 deletion kong/dao/cassandra_db.lua
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ function CassandraDB:count(table_name, tbl, schema)
end
end

function CassandraDB:update(table_name, schema, constraints, filter_keys, values, nils, full, options)
function CassandraDB:update(table_name, schema, constraints, filter_keys, values, nils, full, model, options)
-- must check unique constaints manually too
local err = check_unique_constraints(self, table_name, constraints, values, filter_keys, true)
if err then
Expand All @@ -309,6 +309,30 @@ function CassandraDB:update(table_name, schema, constraints, filter_keys, values
return nil, err
end

-- Cassandra TTL on update is per-column and not per-row, and TTLs cannot be updated on primary keys.
-- Not only that, but TTL on other rows can only be incremented, and not decremented. Because of all
-- of these limitations, the only way to make this happen is to do an upsert operation.
-- This implementation can be changed once Cassandra closes this issue: https://issues.apache.org/jira/browse/CASSANDRA-9312
if options and options.ttl then
if schema.primary_key and #schema.primary_key == 1 and filter_keys[schema.primary_key[1]] then
local row, err = self:find(table_name, schema, filter_keys)
if err then
return nil, err
elseif row then
for k, v in pairs(row) do
if not values[k] then
model[k] = v -- Populate the model to be used later for the insert
end
end

-- Insert without any contraint check, since the check has already been executed
return self:insert(table_name, schema, model, {unique={}, foreign={}}, options)
end
else
return nil, "Cannot update TTL on entities that have more than one primary_key"
end
end

local sets, args, where = {}, {}
for col, value in pairs(values) do
local field = schema.fields[col]
Expand Down
2 changes: 1 addition & 1 deletion kong/dao/dao.lua
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ function DAO:update(tbl, filter_keys, options)
fix(old, values, self.schema)
end

local res, err = self.db:update(self.table, self.schema, self.constraints, primary_keys, values, nils, full_update, options)
local res, err = self.db:update(self.table, self.schema, self.constraints, primary_keys, values, nils, full_update, model, options)
if err then
return nil, err
elseif res then
Expand Down
16 changes: 13 additions & 3 deletions kong/dao/postgres_db.lua
Original file line number Diff line number Diff line change
Expand Up @@ -228,12 +228,22 @@ function PostgresDB:serialize_timestamps(tbl, schema)
end

function PostgresDB:ttl(tbl, table_name, schema, ttl)
if not schema.primary_key or #schema.primary_key > 1 then
if not schema.primary_key or #schema.primary_key ~= 1 then
return false, "Cannot set a TTL if the entity has no primary key, or has more than one primary key"
end

local primary_key_type = self:retrieve_primary_key_type(schema, table_name)
local expire_at = tbl.created_at + (ttl * 1000)

-- Get current server time
local query = "SELECT extract(epoch from now() at time zone 'utc')::bigint*1000 as timestamp;"
local res, err = self:query(query)
if err then
return false, err
end

-- The expiration is always based on the current time
local expire_at = res[1].timestamp + (ttl * 1000)

local query = string.format("SELECT upsert_ttl('%s', %s, '%s', '%s', to_timestamp(%d/1000) at time zone 'UTC')",
tbl[schema.primary_key[1]], primary_key_type == "uuid" and "'"..tbl[schema.primary_key[1]].."'" or "NULL",
schema.primary_key[1], table_name, expire_at)
Expand Down Expand Up @@ -364,7 +374,7 @@ function PostgresDB:count(table_name, tbl, schema)
end
end

function PostgresDB:update(table_name, schema, _, filter_keys, values, nils, full, options)
function PostgresDB:update(table_name, schema, _, filter_keys, values, nils, full, _, options)
local args = {}
local values, err = self:serialize_timestamps(values, schema)
if err then
Expand Down
3 changes: 2 additions & 1 deletion kong/tools/config_defaults.lua
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ return {
["auto-join"] = {type = "boolean", default = true},
["advertise"] = {type = "string", nullable = true},
["encrypt"] = {type = "string", nullable = true},
["profile"] = {type = "string", default = "wan", enum = {"wan", "lan", "local"}}
["profile"] = {type = "string", default = "wan", enum = {"wan", "lan", "local"}},
["ttl_on_failure"] = {type = "number", default = 3600}
}
},
["database"] = {type = "string", default = "cassandra", enum = {"cassandra", "postgres"}},
Expand Down
41 changes: 36 additions & 5 deletions spec/integration/dao/07-options_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@ helpers.for_each_dao(function(db_type, default_options, TYPES)
assert.falsy(row)
end)

it("on update", function()
it("on update - increase ttl", function()
local api, err = factory.apis:insert({
name = "mockbin", request_host = "mockbin.com",
upstream_url = "http://mockbin.com"
}, {ttl = 5})
}, {ttl = 3})
assert.falsy(err)

-- Retrieval
Expand All @@ -54,23 +54,54 @@ helpers.for_each_dao(function(db_type, default_options, TYPES)
assert.falsy(err)
assert.truthy(row)

os.execute("sleep 2")

-- Updating the TTL to a higher value
factory.apis:update({name = "mockbin2"}, {id = api.id}, {ttl = 10})
factory.apis:update({name = "mockbin2"}, {id = api.id}, {ttl = 3})

os.execute("sleep 5")
os.execute("sleep 2")

row, err = factory.apis:find {
id = api.id
}
assert.falsy(err)
assert.truthy(row)

os.execute("sleep 5")
os.execute("sleep 2")

-- It has now finally expired
row, err = factory.apis:find {
id = api.id
}
assert.falsy(err)
assert.falsy(row)
end)

it("on update - decrease ttl", function()
local api, err = factory.apis:insert({
name = "mockbin", request_host = "mockbin.com",
upstream_url = "http://mockbin.com"
}, {ttl = 10})
assert.falsy(err)

os.execute("sleep 3")

-- Retrieval
local row, err = factory.apis:find {
id = api.id
}
assert.falsy(err)
assert.truthy(row)

-- Updating the TTL to a lower value
local _, err = factory.apis:update({name = "mockbin2"}, {id = api.id}, {ttl = 3})
assert.falsy(err)

os.execute("sleep 4")

row, err = factory.apis:find {
id = api.id
}
assert.falsy(err)
assert.falsy(row)
end)
Expand Down

0 comments on commit b1bf822

Please sign in to comment.