Skip to content

Commit cf62582

Browse files
Fix by comments in PR. Make refresh before request
1 parent b087be0 commit cf62582

File tree

9 files changed

+220
-222
lines changed

9 files changed

+220
-222
lines changed

.travis.yml

Lines changed: 15 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -10,28 +10,15 @@ env:
1010
- TARGET=test
1111
- OS=el DIST=6
1212
- OS=el DIST=7
13-
- OS=fedora DIST=24
14-
- OS=fedora DIST=25
15-
- OS=ubuntu DIST=precise
13+
- OS=fedora DIST=28
14+
- OS=fedora DIST=29
1615
- OS=ubuntu DIST=trusty
1716
- OS=ubuntu DIST=xenial
18-
# - OS=debian DIST=wheezy
17+
- OS=ubuntu DIST=bionic
18+
- OS=ubuntu DIST=disco
1919
- OS=debian DIST=jessie
2020
- OS=debian DIST=stretch
21-
22-
matrix:
23-
allow_failures:
24-
# - env: OS=el DIST=6
25-
# - env: OS=el DIST=7
26-
# - env: OS=fedora DIST=24
27-
# - env: OS=fedora DIST=25
28-
# - env: OS=ubuntu DIST=precise
29-
# - env: OS=ubuntu DIST=trusty
30-
# - env: OS=ubuntu DIST=xenial
31-
# - env: OS=ubuntu DIST=yakkety
32-
# - env: OS=debian DIST=wheezy
33-
# - env: OS=debian DIST=jessie
34-
# - env: OS=debian DIST=stretch
21+
- OS=debian DIST=buster
3522

3623
script:
3724
- git describe --long
@@ -100,6 +87,16 @@ deploy:
10087
on:
10188
branch: master
10289
condition: -n "${OS}" && -n "${DIST}" && -n "${PACKAGECLOUD_TOKEN}"
90+
- provider: packagecloud
91+
username: tarantool
92+
repository: "2_2"
93+
token: ${PACKAGECLOUD_TOKEN}
94+
dist: ${OS}/${DIST}
95+
package_glob: build/*.{rpm,deb}
96+
skip_cleanup: true
97+
on:
98+
branch: master
99+
condition: -n "${OS}" && -n "${DIST}" && -n "${PACKAGECLOUD_TOKEN}"
103100

104101
notifications:
105102
email:

tarantool/connection.py

Lines changed: 11 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -126,14 +126,14 @@ def __init__(self, host, port,
126126
self.encoding = encoding
127127
self.call_16 = call_16
128128
self.connection_timeout = connection_timeout
129-
self.authenticated = False
130129
if connect_now:
131130
self.connect()
132131

133132
def close(self):
134133
'''
135134
Close connection to the server
136135
'''
136+
self.connected = False
137137
self._socket.close()
138138
self._socket = None
139139

@@ -201,11 +201,9 @@ def connect(self):
201201
:raise: `NetworkError`
202202
'''
203203
try:
204-
self.authenticated = False
205204
self.connect_basic()
206205
self.handshake()
207206
self.load_schema()
208-
self.authenticated = True
209207
except Exception as e:
210208
self.connected = False
211209
raise NetworkError(e)
@@ -362,47 +360,34 @@ def flush_schema(self):
362360
self.schema.flush()
363361
self.load_schema()
364362

365-
def call(self, func_name, *args):
363+
def call(self, func_name, *args, reconnect=True):
366364
'''
367365
Execute CALL request. Call stored Lua function.
368366
369367
:param func_name: stored Lua function name
370368
:type func_name: str
371-
:param args: list of function arguments
372-
:type args: list or tuple
369+
:param args: function arguments
370+
:type args: tuple
371+
:param reconnect: reconnect before call
372+
:type reconnect: boolean
373373
374374
:rtype: `Response` instance
375375
'''
376376

377+
assert isinstance(func_name, str)
378+
377379
# This allows to use a tuple or list as an argument
378380
if len(args) == 1 and isinstance(args[0], (list, tuple)):
379381
args = args[0]
380382

381-
return self.call_ex(func_name, args, True)
382-
383-
def call_ex(self, func_name, args=[], reconnect=True):
384-
'''
385-
Execute CALL request. Call stored Lua function.
386-
387-
:param func_name: stored Lua function name
388-
:type func_name: str
389-
:param args: list of function arguments
390-
:type args: list or tuple
391-
:param reconnect: reconnect before call
392-
:type reconnect: boolean
393-
394-
:rtype: `Response` instance
395-
'''
396-
assert isinstance(func_name, str)
397-
398383
request = RequestCall(self, func_name, args, self.call_16)
399384
if reconnect:
400385
response = self._send_request(request)
401386
else:
402387
response = self._send_request_wo_reconnect(request)
403388
return response
404389

405-
def eval(self, expr, *args):
390+
def eval(self, expr, *args, reconnect=True):
406391
'''
407392
Execute EVAL request. Eval Lua expression.
408393
@@ -413,28 +398,12 @@ def eval(self, expr, *args):
413398
414399
:rtype: `Response` instance
415400
'''
401+
assert isinstance(expr, str)
416402

417403
# This allows to use a tuple or list as an argument
418404
if len(args) == 1 and isinstance(args[0], (list, tuple)):
419405
args = args[0]
420-
421-
return self.eval_ex(expr, args, True)
422-
423-
def eval_ex(self, expr, args=[], reconnect=True):
424-
'''
425-
Execute EVAL request. Eval Lua expression.
426-
427-
:param expr: Lua expression
428-
:type expr: str
429-
:param args: list of function arguments
430-
:type args: list or tuple
431-
:param reconnect: reconnect before call
432-
:type reconnect: boolean
433-
434-
:rtype: `Response` instance
435-
'''
436-
assert isinstance(expr, str)
437-
406+
438407
request = RequestEval(self, expr, args)
439408
if reconnect:
440409
response = self._send_request(request)

tarantool/const.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,4 +87,4 @@
8787
# Default delay between attempts to reconnect (seconds)
8888
RECONNECT_DELAY = 0.1
8989
# Default cluster nodes list refresh interval (seconds)
90-
NODES_REFRESH_INTERVAL = 300
90+
DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS = 60000

tarantool/mesh_connection.py

Lines changed: 79 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,16 @@
99
from tarantool.error import NetworkError
1010
from tarantool.utils import ENCODING_DEFAULT
1111
from tarantool.const import (
12+
CONNECTION_TIMEOUT,
1213
SOCKET_TIMEOUT,
1314
RECONNECT_MAX_ATTEMPTS,
1415
RECONNECT_DELAY,
15-
NODES_REFRESH_INTERVAL
16+
DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS,
1617
)
1718

19+
from tarantool.utils import (
20+
ENCODING_DEFAULT
21+
)
1822

1923
class RoundRobinStrategy(object):
2024
def __init__(self, addrs):
@@ -88,26 +92,36 @@ class MeshConnection(Connection):
8892
end
8993
'''
9094

91-
def __init__(self, addrs,
95+
def __init__(self, host, port,
9296
user=None,
9397
password=None,
9498
socket_timeout=SOCKET_TIMEOUT,
9599
reconnect_max_attempts=RECONNECT_MAX_ATTEMPTS,
96100
reconnect_delay=RECONNECT_DELAY,
97101
connect_now=True,
98102
encoding=ENCODING_DEFAULT,
103+
call_16=False,
104+
connection_timeout=CONNECTION_TIMEOUT,
105+
cluster_list = None,
99106
strategy_class=RoundRobinStrategy,
100107
get_nodes_function_name=None,
101-
nodes_refresh_interval=NODES_REFRESH_INTERVAL):
102-
self.nattempts = 2 * len(addrs) + 1
108+
nodes_refresh_interval=DEFAULT_CLUSTER_DISCOVERY_DELAY_MILLIS):
109+
110+
addrs = [{"host":host, "port":port}]
111+
if cluster_list:
112+
for i in cluster_list:
113+
if i["host"] == host or i["port"] == port:
114+
continue
115+
addrs.append(i)
116+
103117
self.strategy = strategy_class(addrs)
104118
self.strategy_class = strategy_class
105119
addr = self.strategy.getnext()
106120
host = addr['host']
107121
port = addr['port']
108122
self.get_nodes_function_name = get_nodes_function_name
109-
self.nodes_refresh_interval = nodes_refresh_interval >= 30 and nodes_refresh_interval or 30
110-
self.last_nodes_refresh = 0
123+
self.nodes_refresh_interval = nodes_refresh_interval
124+
self.last_nodes_refresh = time.time()
111125
super(MeshConnection, self).__init__(host=host,
112126
port=port,
113127
user=user,
@@ -116,54 +130,69 @@ def __init__(self, addrs,
116130
reconnect_max_attempts=reconnect_max_attempts,
117131
reconnect_delay=reconnect_delay,
118132
connect_now=connect_now,
119-
encoding=encoding)
120-
121-
def _opt_reconnect(self):
122-
nattempts = self.nattempts
123-
while nattempts > 0:
124-
try:
125-
super(MeshConnection, self)._opt_reconnect()
126-
break
127-
except NetworkError:
128-
nattempts -= 1
129-
addr = self.strategy.getnext()
130-
self.host = addr['host']
131-
self.port = addr['port']
132-
else:
133-
raise NetworkError
133+
encoding=encoding,
134+
call_16=call_16,
135+
connection_timeout=connection_timeout)
134136

135-
if self.authenticated and self.get_nodes_function_name:
136-
now = time.time()
137-
if now - self.last_nodes_refresh > self.nodes_refresh_interval:
138-
self.refresh_nodes(now)
137+
def _opt_refresh_instances(self):
138+
"""
139+
Refresh list of cluster instances. If current connection not in server list will change connection.
140+
"""
141+
now = time.time()
142+
143+
if self.connected and now - self.last_nodes_refresh > self.nodes_refresh_interval/1000:
144+
resp = self.call(self.get_nodes_function_name, reconnect=False)
145+
146+
# got data to refresh
147+
if resp.data and resp.data[0]:
148+
addrs = list(parse_uri(i) for i in resp.data[0])
149+
self.strategy = self.strategy_class(addrs)
150+
self.last_nodes_refresh = now
139151

140-
def refresh_nodes(self, cur_time):
141-
'''
142-
Refreshes nodes list by calling Lua function with name
143-
self.get_nodes_function_name on the current node. If this field is None
144-
no refresh occurs. Usually you don't need to call this function manually
145-
since it's called automatically during reconnect every
146-
self.nodes_refresh_interval seconds.
147-
'''
148-
resp = super(MeshConnection, self).call_ex(self.get_nodes_function_name,
149-
[], reconnect=False)
150-
151-
if not (resp.data and resp.data[0]):
152-
return
153-
154-
addrs_raw = resp.data[0]
155-
if type(addrs_raw) is list:
156-
addrs = []
157-
for uri_str in addrs_raw:
158-
addr = parse_uri(uri_str)
159-
if addr:
160-
addrs.append(addr)
161-
162-
self.strategy = self.strategy_class(addrs)
163-
self.last_nodes_refresh = cur_time
164152
if not {'host': self.host, 'port': self.port} in addrs:
165153
addr = self.strategy.getnext()
166154
self.host = addr['host']
167155
self.port = addr['port']
168156
self.close()
169-
self._opt_reconnect()
157+
158+
if not self.connected:
159+
160+
nattempts = (len(self.strategy.addrs) * 2) + 1
161+
162+
while nattempts >= 0:
163+
try:
164+
addr = self.strategy.getnext()
165+
if addr['host'] != self.host or addr['port'] != self.port:
166+
self.host = addr['host']
167+
self.port = addr['port']
168+
self._opt_reconnect()
169+
break
170+
else:
171+
nattempts -= 1
172+
except NetworkError:
173+
continue
174+
else:
175+
raise NetworkError
176+
177+
def _send_request(self, request):
178+
'''
179+
Send the request to the server through the socket.
180+
Return an instance of `Response` class.
181+
182+
Update instances list from server `get_nodes_function_name` function.
183+
184+
:param request: object representing a request
185+
:type request: `Request` instance
186+
187+
:rtype: `Response` instance
188+
'''
189+
if self.get_nodes_function_name:
190+
self._opt_refresh_instances()
191+
192+
try:
193+
return super(MeshConnection, self)._send_request(request)
194+
except NetworkError:
195+
self.connected = False
196+
self._opt_refresh_instances()
197+
finally:
198+
return super(MeshConnection, self)._send_request(request)

test/cluster-py/multi.test.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ def check_connection(con):
4242
# Make a list of servers
4343
sources = []
4444
for server in cluster[1:]:
45-
sources.append(yaml.load(server.admin('box.cfg.listen', silent=True))[0])
45+
sources.append(yaml.safe_load(server.admin('box.cfg.listen', silent=True))[0])
4646

4747
addrs = []
4848
for addr in sources:

unit/suites/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,9 +8,10 @@
88
from .test_dml import TestSuite_Request
99
from .test_protocol import TestSuite_Protocol
1010
from .test_reconnect import TestSuite_Reconnect
11+
from .test_mesh import TestSuite_Mesh
1112

1213
test_cases = (TestSuite_Schema, TestSuite_Request, TestSuite_Protocol,
13-
TestSuite_Reconnect)
14+
TestSuite_Reconnect, TestSuite_Mesh)
1415

1516
def load_tests(loader, tests, pattern):
1617
suite = unittest.TestSuite()

unit/suites/lib/tarantool_admin.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,4 @@ def execute(self, command):
6161
if (res.rfind("\n...\n") >= 0 or res.rfind("\r\n...\r\n") >= 0):
6262
break
6363

64-
return yaml.load(res)
64+
return yaml.safe_load(res)

0 commit comments

Comments
 (0)