Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Another implementation of ingress controller #4388

Closed
zhixiongdu027 opened this issue Jun 8, 2021 · 18 comments
Closed

Another implementation of ingress controller #4388

zhixiongdu027 opened this issue Jun 8, 2021 · 18 comments

Comments

@zhixiongdu027
Copy link
Contributor

Although there is an official implementation of Ingress Controller, it is not very practical in some scenarios.

1: Small-scale cluster. If your cluster only has 1~2 nodes, you cannot provide a stable Etcd deployment environment. Even if you deploy it in a certain way, it will consume additional resources.

2: The general private k8s cluster does not have a stable external storage service. Even if the number of nodes is sufficient, it is not convenient to deploy etcd

3: An isolated multi-tenant cluster through the namespace. The appearance of the Ingress Controller as a cluster entry will break the isolation of tenants. Even creating a new Ingress Contoller Class for each tenant is not appropriate.

Based on the above reasons, I designed a new implementation of Ingress Controller.
The main points of implementation are as follows:

1: Define CRD on the cluster
2: Deploy apisix service in each namespace of the cluster as and only as the traffic entrance of that namespace
3: Each apisix uses the privileged process list-watch k8s namespaced crd resources, and then writes it to conf/apisix.yaml
4: Implement k8s discovery (list-watch k8s namespaced enpoints)

This way
No need for additional development language and framework intervention,
No need for additional etcd, reduced data transfer process,
It is more robust than the official implementation.

The only implementation difficulty may be how to implement webhooks.
If use client-go development, it is not convenient to verify config schema, plugin schema.
If use the apisix plugin, it is not convenient to verify the uniqueness of service_id, upstream_id and route_id because of the nginx process model.

@tokers
Copy link
Contributor

tokers commented Jun 9, 2021

@adugeek Actually we considered this plan but the biggest obstacle is we have to do many disk IO operations, especially for the Service endpoints change.

@zhixiongdu027
Copy link
Contributor Author

@adugeek Actually we considered this plan but the biggest obstacle is we have to do many disk IO operations, especially for the Service endpoints change.

"Endpoints Change" is not a big problem, just provide a k8s discovery.

@tokers
Copy link
Contributor

tokers commented Jun 9, 2021

@adugeek Actually we considered this plan but the biggest obstacle is we have to do many disk IO operations, especially for the Service endpoints change.

"Endpoints Change" is not a big problem, just provide a k8s discovery.

I mean, too frequent endpoints change will make the disk io heavy.

@zhixiongdu027
Copy link
Contributor Author

zhixiongdu027 commented Jun 9, 2021

I mean, too frequent endpoints change will make the disk io heavy.

I understand your concerns.
But what I mean is that changes in enpoints will be sensed by k8s discvery and written to ngx.shared.DICT. but not write to apisix.yaml .
There is no disk io here.

Talk is cheap. Let's look at the code

local ipairs = ipairs
local ngx = ngx
local string = string
local tonumber = tonumber
local math = math
local os = os
local process = require("ngx.process")
local core = require("apisix.core")
local util = require("apisix.cli.util")
local http = require("resty.http")
local signal = require("resty.signal")
local ngx_timer_at = ngx.timer.at
local shared_endpoints = ngx.shared.discovery

local apiserver_host = ""
local apiserver_port = ""
local namespace = ""
local token = ""

local default_weight = 50

local lrucache = core.lrucache.new({
    ttl = 300,
    count = 1024
})

local cache_table = {}

local function end_world(reason)
    core.log.emerg(reason)
    signal.kill(process.get_master_pid(), signal.signum("QUIT"))
end

local function sort_by_key_host(a, b)
    return a.host < b.host
end

local function on_endpoint_added(endpoint)
    local subsets = endpoint.subsets
    if subsets == nil or #subsets == 0 then
        return
    end

    local subset = subsets[1]

    local addresses = subset.addresses
    if addresses == nil or #addresses == 0 then
        return
    end

    local ports = subset.ports
    if ports == nil or #ports == 0 then
        return
    end

    core.table.clear(cache_table)
    for _, port in ipairs(ports) do
        local nodes = core.table.new(#addresses, 0)
        for i, address in ipairs(addresses) do
            nodes[i] = {
                host = address.ip,
                port = port.port,
                weight = default_weight
            }
        end
        core.table.sort(nodes, sort_by_key_host)
        cache_table[port.name] = nodes
    end

    local _, err
    _, err = shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
    if err then
        core.log.emerg("set endpoint version into discovery DICT failed ,", err)
    end

    shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(cache_table, true))
    if err then
        core.log.emerg("set endpoint into discovery DICT failed ,", err)
    end
end

local function on_endpoint_deleted(endpoint)
    shared_endpoints:deleted(endpoint.metadata.name .. "#version")
    shared_endpoints:delete(endpoint.metadata.name)
end

local function on_endpoint_modified(endpoint)
    local subsets = endpoint.subsets
    if subsets == nil or #subsets == 0 then
        return on_endpoint_deleted(endpoint)
    end

    local subset = subsets[1]

    local addresses = subset.addresses
    if addresses == nil or #addresses == 0 then
        return on_endpoint_deleted(endpoint)
    end

    local ports = subset.ports
    if ports == nil or #ports == 0 then
        return on_endpoint_deleted(endpoint)
    end

    core.table.clear(cache_table)
    for _, port in ipairs(ports) do
        local nodes = core.table.new(#addresses, 0)
        for i, address in ipairs(addresses) do
            nodes[i] = {
                host = address.ip,
                port = port.port,
                weight = default_weight
            }
        end
        core.table.sort(nodes, sort_by_key_host)
        cache_table[port.name] = nodes
    end

    local _, err
    _, err = shared_endpoints:safe_set(endpoint.metadata.name .. "#version", endpoint.metadata.resourceVersion)
    if err then
        core.log.emerg("set endpoints version into discovery DICT failed ,", err)
    end

    shared_endpoints:safe_set(endpoint.metadata.name, core.json.encode(cache_table, true))
    if err then
        core.log.emerg("set endpoints into discovery DICT failed ,", err)
    end
end

local endpoint_resource = {
    version = "v1",
    kind = "Endpoints",
    listKind = "EndpointsList",
    plural = "endpoints",
    max_resource_version = 0,
    list_path = function(self)
        return string.format("/api/v1/namespaces/%s/endpoints", namespace)
    end,

    list_query = function(self, continue)
        if continue == nil or continue == "" then
            return "limit=45"
        else
            return "limit=45&continue=" .. continue
        end
    end,

    watch_path = function(self)
        return string.format("/api/v1/namespaces/%s/endpoints", namespace)
    end,
    watch_query = function(self, timeout)
        return string.format("watch=1&allowWatchBookmarks=true&timeoutSeconds=%d&resourceVersion=%d", timeout,
                   self.max_resource_version)
    end,
    pre_list_callback = function(self)
        self.max_resource_version = 0
        shared_endpoints:flush_all()
    end,
    post_list_callback = function(self)
        shared_endpoints:flush_expired()
    end,
    added_callback = function(self, object, drive)
        on_endpoint_added(object)
    end,
    modified_callback = function(self, object)
        on_endpoint_modified(object)
    end,
    deleted_callback = function(self, object)
        on_endpoint_deleted(object)
    end
}

local function event_dispatch(resource, event, object, drive)
    if drive == "watch" then
        local resource_version = object.metadata.resourceVersion
        local rvv = tonumber(resource_version)
        if rvv <= resource.max_resource_version then
            return
        end
        resource.max_resource_version = rvv
    end

    if event == "ADDED" then
        resource:added_callback(object, drive)
    elseif event == "MODIFIED" then
        if object.deletionTimestamp ~= nil then
            resource:deleted_callback(object)
        else
            resource:modified_callback(object)
        end
    elseif event == "DELETED" then
        resource:deleted_callback(object)
    elseif event == "BOOKMARK" then
        -- do nothing because we had record max_resource_version to resource.max_resource_version
    end
end

local function list_resource(httpc, resource, continue)
    httpc:set_timeouts(2000, 2000, 3000)
    local res, err = httpc:request({
        path = resource:list_path(),
        query = resource:list_query(),
        headers = {
            ["Authorization"] = string.format("Bearer %s", token)
        }
    })

    if not res then
        return false, "RequestError", err or ""
    end

    if res.status ~= 200 then
        return false, res.reason, res.read_body() or ""
    end

    local body, err = res:read_body()
    if err then
        return false, "ReadBodyError", err
    end

    local data, _ = core.json.decode(body)
    if not data or data.kind ~= resource.listKind then
        return false, "UnexpectedBody", body
    end

    local resource_version = data.metadata.resourceVersion
    resource.max_resource_version = tonumber(resource_version)

    for _, item in ipairs(data.items) do
        event_dispatch(resource, "ADDED", item, "list")
    end

    if data.metadata.continue ~= nil and data.metadata.continue ~= "" then
        list_resource(httpc, resource, data.metadata.continue)
    end

    return true, "Success", ""
end

local function watch_resource(httpc, resource)
    math.randomseed(process.get_master_pid())
    local watch_seconds = 1800 + math.random(60, 1200)
    local allowance_seconds = 120
    httpc:set_timeouts(2000, 3000, (watch_seconds + allowance_seconds) * 1000)
    local res, err = httpc:request({
        path = resource:watch_path(),
        query = resource:watch_query(watch_seconds),
        headers = {
            ["Authorization"] = string.format("Bearer %s", token)
        }
    })

    if err then
        return false, "RequestError", err
    end

    if res.status ~= 200 then
        return false, res.reason, res.read_body and res.read_body()
    end

    local remaindBody = ""
    local body = ""
    local reader = res.body_reader
    local gmatchIterator;
    local captures;
    local capturedSize = 0
    while true do

        body, err = reader()
        if err then
            return false, "ReadBodyError", err
        end

        if not body then
            break
        end

        if #remaindBody ~= 0 then
            body = remaindBody .. body
        end

        gmatchIterator, err = ngx.re.gmatch(body, "{\"type\":.*}\n", "jiao")
        if not gmatchIterator then
            return false, "GmatchError", err
        end

        while true do
            captures, err = gmatchIterator()
            if err then
                return false, "GmatchError", err
            end
            if not captures then
                break
            end
            capturedSize = capturedSize + #captures[0]
            local v, _ = core.json.decode(captures[0])
            if not v or not v.object or v.object.kind ~= resource.kind then
                return false, "UnexpectedBody", captures[0]
            end
            event_dispatch(resource, v.type, v.object, "watch")
        end

        if capturedSize == #body then
            remaindBody = ""
        elseif capturedSize == 0 then
            remaindBody = body
        else
            remaindBody = string.sub(body, capturedSize + 1)
        end
    end
    watch_resource(httpc, resource)
end

local function fetch_resource(resource)
    while true do
        local ok = false
        local reason, message = "", ""
        local intervalTime = 0
        repeat
            local httpc = http.new()
            resource.watch_state = "connecting"
            core.log.info("begin to connect ", resource.plural)
            ok, message = httpc:connect({
                scheme = "https",
                host = apiserver_host,
                port = tonumber(apiserver_port),
                ssl_verify = false
            })
            if not ok then
                resource.watch_state = "connecting"
                core.log.error("connect apiserver failed , apiserver_host: ", apiserver_host, "apiserver_port",
                    apiserver_port, "message : ", message)
                intervalTime = 200
                break
            end

            core.log.info("begin to list ", resource.plural)
            resource.watch_state = "listing"
            resource:pre_list_callback()
            ok, reason, message = list_resource(httpc, resource)
            if not ok then
                resource.watch_state = "listing failed"
                core.log.error("list failed , resource: ", resource.plural, " reason: ", reason, "message : ", message)
                intervalTime = 200
                break
            end
            resource.watch_state = "list finished"
            resource:post_list_callback()

            core.log.info("begin to watch ", resource.plural)
            resource.watch_state = "watching"
            ok, reason, message = watch_resource(httpc, resource)
            if not ok then
                resource.watch_state = "watch failed"
                core.log.error("watch failed, resource: ", resource.plural, " reason: ", reason, "message : ", message)
                intervalTime = 100
                break
            end
            resource.watch_state = "watch finished"
            intervalTime = 0
        until true
        ngx.sleep(intervalTime)
    end
end

local function create_lrucache(service_name, port_name)
    local endpoint, _, _ = shared_endpoints:get_stale(service_name)
    if not endpoint then
        core.log.error("get emppty endpoint from discovery DICT,this should not happen ", service_name)
        return nil
    end

    local t, _ = core.json.decode(endpoint)
    if not t then
        core.log.error("json decode endpoint failed, this should not happen, content : ", endpoint)
    end
    return t[port_name]
end

local _M = {
    version = 0.01
}

function _M.nodes(service_name)
    local pattern = "([a-z][a-z0-9-.]{0,62})[:]([a-z][a-z0-9-.]{0,62})$"
    local match, _ = ngx.re.match(service_name, pattern, "jiao")
    if not match then
        core.log.info("get unexpected upstream service_name: ", service_name)
        return nil
    end
    local k8s_service_name = match[1]
    local k8s_port_name = match[2]
    local version, _, err = shared_endpoints:get_stale(k8s_service_name .. "#version")
    if not version then
        core.log.info("get emppty endpoint version from discovery DICT ", k8s_service_name)
        return nil
    end
    return lrucache(service_name, version, create_lrucache, k8s_service_name, k8s_port_name)
end

function _M.init_worker()
    if process.type() ~= "privileg,med agent" then
        return
    end

    local err
    namespace, err = util.read_file("/home/adugeek/Temp/namespace")
    if not namespace or namespace == "" then
        end_world("get empty namespace value " .. (err or ""))
        return
    end

    token, err = util.read_file("/home/adugeek/Temp/token")
    if not token or token == "" then
        end_world("get empty token value " .. (err or ""))
        return
    end

    apiserver_host = os.getenv("KUBERNETES_SERVICE_HOST")
    if not apiserver_host or apiserver_host == "" then
        end_world("get empty KUBERNETES_SERVICE_HOST value")
    end

    apiserver_port = os.getenv("KUBERNETES_SERVICE_PORT")
    if not apiserver_port or apiserver_port == "" then
        end_world("get empty KUBERNETES_SERVICE_PORT value")
    end

    ngx_timer_at(0, fetch_resource, endpoint_resource)
end

return _M

@tokers
Copy link
Contributor

tokers commented Jun 10, 2021

All right, I thought you were going to write all configurations into apisix.yaml. It seems like you want to modify APISIX so that it can communicate with Kube API Server.

So how do we should handle other data? Like Routes and Consumers?

@zhixiongdu027
Copy link
Contributor Author

For Routes and Services, Upstreams, ...

First: we can deploy CRD, just like any other xController

Then, we can develop a "controller.lua" as apisix plugins.
Its function is list-watch crds then write to apisix.yaml

If "Routes, Services, Upstreams" changes frequently, cause heavy disk io, we can develop a "apisix/core/config_memory.lua",
usually, this is not needed

@tokers
Copy link
Contributor

tokers commented Jun 10, 2021

Well, this way might be clean and native if we implement the controller by Lua, but not a good idea from the ecosystem incorporation. We cannot enjoy the convenience of Kubernetes-related tools, packages, and even their community members.

cc @tao12345666333

@zhixiongdu027
Copy link
Contributor Author

zhixiongdu027 commented Jun 13, 2021

I had commit this implement.
link is https://github.com/adugeek/ApisixController
@tokers

@tokers
Copy link
Contributor

tokers commented Jun 14, 2021

@adugeek Wow, what a good masterpiece! Are you using it in your production now?

@zhixiongdu027
Copy link
Contributor Author

Not yet, under testing now.

@lookbrook
Copy link

The k8s endpoints discovery is very useful,I think it should be merge into apisix

@tao12345666333
Copy link
Member

The k8s endpoints discovery is very useful,I think it should be merge into apisix

I agree. @adugeek would you like to add a kubernetes discovery plugin for Apache APISIX?

@zou8944
Copy link

zou8944 commented Aug 15, 2021

I think it is useful too, so how is it going.

@tokers
Copy link
Contributor

tokers commented Aug 15, 2021

Not yet, under testing now.

@adugeek Any updates?

@zhixiongdu027
Copy link
Contributor Author

zhixiongdu027 commented Aug 18, 2021

@tao12345666333 @tokers @lookbrook @zou8944
sorry, I will create pr in on week .
but , I need some one teach me how to create test cases

@tokers
Copy link
Contributor

tokers commented Aug 18, 2021

@tao12345666333 @tokers @lookbrook @zou8944
sorry, I will create pr in on week .
but , I need some one teach me how to create test cases

See https://github.com/apache/apisix/blob/master/docs/en/latest/internal/testing-framework.md for details.

@zhixiongdu027
Copy link
Contributor Author

It's not about how to send request and check response by "testing framework",

It's about how to create an k8s cluster ,build an apisix docker ,deploy and scale test service in "testing framework"

@tokers
Copy link
Contributor

tokers commented Aug 22, 2021

@adugeek Just refer to the chaos mesh testing: https://github.com/apache/apisix/blob/master/.github/workflows/chaos.yml.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants