forked from apache/apisix
-
Notifications
You must be signed in to change notification settings - Fork 0
/
eureka.lua
253 lines (222 loc) · 7.76 KB
/
eureka.lua
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
--
-- Licensed to the Apache Software Foundation (ASF) under one or more
-- contributor license agreements. See the NOTICE file distributed with
-- this work for additional information regarding copyright ownership.
-- The ASF licenses this file to You under the Apache License, Version 2.0
-- (the "License"); you may not use this file except in compliance with
-- the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing, software
-- distributed under the License is distributed on an "AS IS" BASIS,
-- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-- See the License for the specific language governing permissions and
-- limitations under the License.
--
local local_conf = require("apisix.core.config_local").local_conf()
local http = require("resty.http")
local core = require("apisix.core")
local ipmatcher = require("resty.ipmatcher")
local ipairs = ipairs
local tostring = tostring
local type = type
local math_random = math.random
local error = error
local ngx = ngx
local ngx_timer_at = ngx.timer.at
local ngx_timer_every = ngx.timer.every
local string_sub = string.sub
local string_find = string.find
local log = core.log
local default_weight
local applications
local schema = {
type = "object",
properties = {
host = {
type = "array",
minItems = 1,
items = {
type = "string",
},
},
fetch_interval = {type = "integer", minimum = 1, default = 30},
prefix = {type = "string"},
weight = {type = "integer", minimum = 0},
timeout = {
type = "object",
properties = {
connect = {type = "integer", minimum = 1, default = 2000},
send = {type = "integer", minimum = 1, default = 2000},
read = {type = "integer", minimum = 1, default = 5000},
}
},
},
required = {"host"}
}
local _M = {
version = 0.1,
}
local function service_info()
local host = local_conf.eureka and local_conf.eureka.host
if not host then
log.error("do not set eureka.host")
return
end
local basic_auth
-- TODO Add health check to get healthy nodes.
local url = host[math_random(#host)]
local auth_idx = string_find(url, "@", 1, true)
if auth_idx then
local protocol_idx = string_find(url, "://", 1, true)
local protocol = string_sub(url, 1, protocol_idx + 2)
local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1)
local other = string_sub(url, auth_idx + 1)
url = protocol .. other
basic_auth = "Basic " .. ngx.encode_base64(user_and_password)
end
if local_conf.eureka.prefix then
url = url .. local_conf.eureka.prefix
end
if string_sub(url, #url) ~= "/" then
url = url .. "/"
end
return url, basic_auth
end
local function request(request_uri, basic_auth, method, path, query, body)
log.info("eureka uri:", request_uri, ".")
local url = request_uri .. path
local headers = core.table.new(0, 5)
headers['Connection'] = 'Keep-Alive'
headers['Accept'] = 'application/json'
if basic_auth then
headers['Authorization'] = basic_auth
end
if body and 'table' == type(body) then
local err
body, err = core.json.encode(body)
if not body then
return nil, 'invalid body : ' .. err
end
-- log.warn(method, url, body)
headers['Content-Type'] = 'application/json'
end
local httpc = http.new()
local timeout = local_conf.eureka.timeout
local connect_timeout = timeout and timeout.connect or 2000
local send_timeout = timeout and timeout.send or 2000
local read_timeout = timeout and timeout.read or 5000
log.info("connect_timeout:", connect_timeout, ", send_timeout:", send_timeout,
", read_timeout:", read_timeout, ".")
httpc:set_timeouts(connect_timeout, send_timeout, read_timeout)
return httpc:request_uri(url, {
version = 1.1,
method = method,
headers = headers,
query = query,
body = body,
ssl_verify = false,
})
end
local function parse_instance(instance)
local status = instance.status
local overridden_status = instance.overriddenstatus or instance.overriddenStatus
if overridden_status and overridden_status ~= "UNKNOWN" then
status = overridden_status
end
if status ~= "UP" then
return
end
local port
if tostring(instance.port["@enabled"]) == "true" and instance.port["$"] then
port = instance.port["$"]
-- secure = false
end
if tostring(instance.securePort["@enabled"]) == "true" and instance.securePort["$"] then
port = instance.securePort["$"]
-- secure = true
end
local ip = instance.ipAddr
if not ipmatcher.parse_ipv4(ip) and
not ipmatcher.parse_ipv6(ip) then
log.error(instance.app, " service ", instance.hostName, " node IP ", ip,
" is invalid(must be IPv4 or IPv6).")
return
end
return ip, port, instance.metadata
end
local function fetch_full_registry(premature)
if premature then
return
end
local request_uri, basic_auth = service_info()
if not request_uri then
return
end
local res, err = request(request_uri, basic_auth, "GET", "apps")
if not res then
log.error("failed to fetch registry", err)
return
end
if not res.body or res.status ~= 200 then
log.error("failed to fetch registry, status = ", res.status)
return
end
local json_str = res.body
local data, err = core.json.decode(json_str)
if not data then
log.error("invalid response body: ", json_str, " err: ", err)
return
end
local apps = data.applications.application
local up_apps = core.table.new(0, #apps)
for _, app in ipairs(apps) do
for _, instance in ipairs(app.instance) do
local ip, port, metadata = parse_instance(instance)
if ip and port then
local nodes = up_apps[app.name]
if not nodes then
nodes = core.table.new(#app.instance, 0)
up_apps[app.name] = nodes
end
core.table.insert(nodes, {
host = ip,
port = port,
weight = metadata and metadata.weight or default_weight,
metadata = metadata,
})
if metadata then
-- remove useless data
metadata.weight = nil
end
end
end
end
applications = up_apps
end
function _M.nodes(service_name)
if not applications then
log.error("failed to fetch nodes for : ", service_name)
return
end
return applications[service_name]
end
function _M.init_worker()
if not local_conf.eureka or not local_conf.eureka.host or #local_conf.eureka.host == 0 then
error("do not set eureka.host")
return
end
local ok, err = core.schema.check(schema, local_conf.eureka)
if not ok then
error("invalid eureka configuration: " .. err)
return
end
default_weight = local_conf.eureka.weight or 100
log.info("default_weight:", default_weight, ".")
local fetch_interval = local_conf.eureka.fetch_interval or 30
log.info("fetch_interval:", fetch_interval, ".")
ngx_timer_at(0, fetch_full_registry)
ngx_timer_every(fetch_interval, fetch_full_registry)
end
return _M