Skip to content

Commit fcce6b5

Browse files
committed
Remove cluster client.
influxdata#339
1 parent c7d314b commit fcce6b5

File tree

5 files changed

+1
-319
lines changed

5 files changed

+1
-319
lines changed

README.rst

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -108,20 +108,6 @@ Here's a basic example (for more see the examples directory)::
108108

109109
>>> print("Result: {0}".format(result))
110110

111-
If you want to connect to a cluster, you could initialize a ``InfluxDBClusterClient``::
112-
113-
$ python
114-
115-
>>> from influxdb import InfluxDBClusterClient
116-
117-
>>> cc = InfluxDBClusterClient(hosts = [('192.168.0.1', 8086),
118-
('192.168.0.2', 8086),
119-
('192.168.0.3', 8086)],
120-
username='root',
121-
password='root',
122-
database='example')
123-
124-
``InfluxDBClusterClient`` has the same methods as ``InfluxDBClient``, it basically is a proxy to multiple InfluxDBClients.
125111

126112
Testing
127113
=======

docs/source/api-documentation.rst

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,16 +45,6 @@ These clients are initiated in the same way as the
4545
:members:
4646
:undoc-members:
4747

48-
------------------------------
49-
:class:`InfluxDBClusterClient`
50-
------------------------------
51-
52-
53-
.. currentmodule:: influxdb.InfluxDBClusterClient
54-
.. autoclass:: influxdb.InfluxDBClusterClient
55-
:members:
56-
:undoc-members:
57-
5848
------------------------
5949
:class:`DataFrameClient`
6050
------------------------

influxdb/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,12 @@
66
from __future__ import unicode_literals
77

88
from .client import InfluxDBClient
9-
from .client import InfluxDBClusterClient
109
from .dataframe_client import DataFrameClient
1110
from .helper import SeriesHelper
1211

1312

1413
__all__ = [
1514
'InfluxDBClient',
16-
'InfluxDBClusterClient',
1715
'DataFrameClient',
1816
'SeriesHelper',
1917
]

influxdb/client.py

Lines changed: 0 additions & 168 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,8 @@
77
from __future__ import print_function
88
from __future__ import unicode_literals
99

10-
from functools import wraps
1110
import json
1211
import socket
13-
import time
14-
import threading
15-
import random
1612
import requests
1713
import requests.exceptions
1814
from sys import version_info
@@ -114,8 +110,6 @@ def __init__(self,
114110
'Accept': 'text/plain'
115111
}
116112

117-
# _baseurl, _host and _port are properties to allow InfluxDBClusterClient
118-
# to override them with thread-local variables
119113
@property
120114
def _baseurl(self):
121115
return self._get_baseurl()
@@ -772,168 +766,6 @@ def send_packet(self, packet):
772766
self.udp_socket.sendto(data, (self._host, self.udp_port))
773767

774768

775-
class InfluxDBClusterClient(object):
776-
"""The :class:`~.InfluxDBClusterClient` is the client for connecting
777-
to a cluster of InfluxDB servers. Each query hits different host from the
778-
list of hosts.
779-
780-
:param hosts: all hosts to be included in the cluster, each of which
781-
should be in the format (address, port),
782-
e.g. [('127.0.0.1', 8086), ('127.0.0.1', 9096)]. Defaults to
783-
[('localhost', 8086)]
784-
:type hosts: list of tuples
785-
:param shuffle: whether the queries should hit servers evenly(randomly),
786-
defaults to True
787-
:type shuffle: bool
788-
:param client_base_class: the base class for the cluster client.
789-
This parameter is used to enable the support of different client
790-
types. Defaults to :class:`~.InfluxDBClient`
791-
:param healing_delay: the delay in seconds, counting from last failure of
792-
a server, before re-adding server to the list of working servers.
793-
Defaults to 15 minutes (900 seconds)
794-
"""
795-
796-
def __init__(self,
797-
hosts=[('localhost', 8086)],
798-
username='root',
799-
password='root',
800-
database=None,
801-
ssl=False,
802-
verify_ssl=False,
803-
timeout=None,
804-
use_udp=False,
805-
udp_port=4444,
806-
shuffle=True,
807-
client_base_class=InfluxDBClient,
808-
healing_delay=900,
809-
):
810-
self.clients = [self] # Keep it backwards compatible
811-
self.hosts = hosts
812-
self.bad_hosts = [] # Corresponding server has failures in history
813-
self.shuffle = shuffle
814-
self.healing_delay = healing_delay
815-
self._last_healing = time.time()
816-
host, port = self.hosts[0]
817-
self._hosts_lock = threading.Lock()
818-
self._thread_local = threading.local()
819-
self._client = client_base_class(host=host,
820-
port=port,
821-
username=username,
822-
password=password,
823-
database=database,
824-
ssl=ssl,
825-
verify_ssl=verify_ssl,
826-
timeout=timeout,
827-
use_udp=use_udp,
828-
udp_port=udp_port)
829-
for method in dir(client_base_class):
830-
orig_attr = getattr(client_base_class, method, '')
831-
if method.startswith('_') or not callable(orig_attr):
832-
continue
833-
834-
setattr(self, method, self._make_func(orig_attr))
835-
836-
self._client._get_host = self._get_host
837-
self._client._get_port = self._get_port
838-
self._client._get_baseurl = self._get_baseurl
839-
self._update_client_host(self.hosts[0])
840-
841-
@staticmethod
842-
def from_DSN(dsn, client_base_class=InfluxDBClient,
843-
shuffle=True, **kwargs):
844-
"""Same as :meth:`~.InfluxDBClient.from_DSN`, but supports
845-
multiple servers.
846-
847-
:param shuffle: whether the queries should hit servers
848-
evenly(randomly), defaults to True
849-
:type shuffle: bool
850-
:param client_base_class: the base class for all clients in the
851-
cluster. This parameter is used to enable the support of
852-
different client types. Defaults to :class:`~.InfluxDBClient`
853-
854-
:Example:
855-
856-
::
857-
858-
>> cluster = InfluxDBClusterClient.from_DSN('influxdb://usr:pwd\
859-
@host1:8086,usr:pwd@host2:8086/db_name', timeout=5)
860-
>> type(cluster)
861-
<class 'influxdb.client.InfluxDBClusterClient'>
862-
>> cluster.hosts
863-
[('host1', 8086), ('host2', 8086)]
864-
>> cluster._client
865-
<influxdb.client.InfluxDBClient at 0x7feb438ec950>]
866-
"""
867-
init_args = parse_dsn(dsn)
868-
init_args.update(**kwargs)
869-
init_args['shuffle'] = shuffle
870-
init_args['client_base_class'] = client_base_class
871-
cluster_client = InfluxDBClusterClient(**init_args)
872-
return cluster_client
873-
874-
def _update_client_host(self, host):
875-
self._thread_local.host, self._thread_local.port = host
876-
self._thread_local.baseurl = "{0}://{1}:{2}".format(
877-
self._client._scheme,
878-
self._client._host,
879-
self._client._port
880-
)
881-
882-
def _get_baseurl(self):
883-
return self._thread_local.baseurl
884-
885-
def _get_host(self):
886-
return self._thread_local.host
887-
888-
def _get_port(self):
889-
return self._thread_local.port
890-
891-
def _make_func(self, orig_func):
892-
893-
@wraps(orig_func)
894-
def func(*args, **kwargs):
895-
now = time.time()
896-
with self._hosts_lock:
897-
if (self.bad_hosts and
898-
self._last_healing + self.healing_delay < now):
899-
h = self.bad_hosts.pop(0)
900-
self.hosts.append(h)
901-
self._last_healing = now
902-
903-
if self.shuffle:
904-
random.shuffle(self.hosts)
905-
906-
hosts = self.hosts + self.bad_hosts
907-
908-
for h in hosts:
909-
bad_host = False
910-
try:
911-
self._update_client_host(h)
912-
return orig_func(self._client, *args, **kwargs)
913-
except InfluxDBClientError as e:
914-
# Errors caused by user's requests, re-raise
915-
raise e
916-
except ValueError as e:
917-
raise e
918-
except Exception as e:
919-
# Errors that might caused by server failure, try another
920-
bad_host = True
921-
with self._hosts_lock:
922-
if h in self.hosts:
923-
self.hosts.remove(h)
924-
self.bad_hosts.append(h)
925-
self._last_healing = now
926-
finally:
927-
with self._hosts_lock:
928-
if not bad_host and h in self.bad_hosts:
929-
self.bad_hosts.remove(h)
930-
self.hosts.append(h)
931-
932-
raise InfluxDBServerError("InfluxDB: no viable server!")
933-
934-
return func
935-
936-
937769
def parse_dsn(dsn):
938770
conn_params = urlparse(dsn)
939771
init_args = {}

influxdb/tests/client_test.py

Lines changed: 1 addition & 125 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323
import requests
2424
import requests.exceptions
2525
import socket
26-
import time
2726
import requests_mock
2827
import random
2928
from nose.tools import raises
@@ -32,8 +31,7 @@
3231
import mock
3332
import unittest
3433

35-
from influxdb import InfluxDBClient, InfluxDBClusterClient
36-
from influxdb.client import InfluxDBServerError
34+
from influxdb import InfluxDBClient
3735

3836

3937
def _build_response_object(status_code=200, content=""):
@@ -846,125 +844,3 @@ def query(self,
846844
raise Exception("Fail Twice")
847845
else:
848846
return "Success"
849-
850-
851-
class TestInfluxDBClusterClient(unittest.TestCase):
852-
853-
def setUp(self):
854-
# By default, raise exceptions on warnings
855-
warnings.simplefilter('error', FutureWarning)
856-
857-
self.hosts = [('host1', 8086), ('host2', 8086), ('host3', 8086)]
858-
self.dsn_string = 'influxdb://uSr:pWd@host1:8086,uSr:pWd@host2:8086/db'
859-
860-
def test_init(self):
861-
cluster = InfluxDBClusterClient(hosts=self.hosts,
862-
username='username',
863-
password='password',
864-
database='database',
865-
shuffle=False,
866-
client_base_class=FakeClient)
867-
self.assertEqual(3, len(cluster.hosts))
868-
self.assertEqual(0, len(cluster.bad_hosts))
869-
self.assertIn((cluster._client._host,
870-
cluster._client._port), cluster.hosts)
871-
872-
def test_one_server_fails(self):
873-
cluster = InfluxDBClusterClient(hosts=self.hosts,
874-
database='database',
875-
shuffle=False,
876-
client_base_class=FakeClient)
877-
self.assertEqual('Success', cluster.query('Fail once'))
878-
self.assertEqual(2, len(cluster.hosts))
879-
self.assertEqual(1, len(cluster.bad_hosts))
880-
881-
def test_two_servers_fail(self):
882-
cluster = InfluxDBClusterClient(hosts=self.hosts,
883-
database='database',
884-
shuffle=False,
885-
client_base_class=FakeClient)
886-
self.assertEqual('Success', cluster.query('Fail twice'))
887-
self.assertEqual(1, len(cluster.hosts))
888-
self.assertEqual(2, len(cluster.bad_hosts))
889-
890-
def test_all_fail(self):
891-
cluster = InfluxDBClusterClient(hosts=self.hosts,
892-
database='database',
893-
shuffle=True,
894-
client_base_class=FakeClient)
895-
with self.assertRaises(InfluxDBServerError):
896-
cluster.query('Fail')
897-
self.assertEqual(0, len(cluster.hosts))
898-
self.assertEqual(3, len(cluster.bad_hosts))
899-
900-
def test_all_good(self):
901-
cluster = InfluxDBClusterClient(hosts=self.hosts,
902-
database='database',
903-
shuffle=True,
904-
client_base_class=FakeClient)
905-
self.assertEqual('Success', cluster.query(''))
906-
self.assertEqual(3, len(cluster.hosts))
907-
self.assertEqual(0, len(cluster.bad_hosts))
908-
909-
def test_recovery(self):
910-
cluster = InfluxDBClusterClient(hosts=self.hosts,
911-
database='database',
912-
shuffle=True,
913-
client_base_class=FakeClient)
914-
with self.assertRaises(InfluxDBServerError):
915-
cluster.query('Fail')
916-
self.assertEqual('Success', cluster.query(''))
917-
self.assertEqual(1, len(cluster.hosts))
918-
self.assertEqual(2, len(cluster.bad_hosts))
919-
920-
def test_healing(self):
921-
cluster = InfluxDBClusterClient(hosts=self.hosts,
922-
database='database',
923-
shuffle=True,
924-
healing_delay=1,
925-
client_base_class=FakeClient)
926-
with self.assertRaises(InfluxDBServerError):
927-
cluster.query('Fail')
928-
self.assertEqual('Success', cluster.query(''))
929-
time.sleep(1.1)
930-
self.assertEqual('Success', cluster.query(''))
931-
self.assertEqual(2, len(cluster.hosts))
932-
self.assertEqual(1, len(cluster.bad_hosts))
933-
time.sleep(1.1)
934-
self.assertEqual('Success', cluster.query(''))
935-
self.assertEqual(3, len(cluster.hosts))
936-
self.assertEqual(0, len(cluster.bad_hosts))
937-
938-
def test_dsn(self):
939-
cli = InfluxDBClusterClient.from_DSN(self.dsn_string)
940-
self.assertEqual([('host1', 8086), ('host2', 8086)], cli.hosts)
941-
self.assertEqual('http://host1:8086', cli._client._baseurl)
942-
self.assertEqual('uSr', cli._client._username)
943-
self.assertEqual('pWd', cli._client._password)
944-
self.assertEqual('db', cli._client._database)
945-
self.assertFalse(cli._client.use_udp)
946-
947-
cli = InfluxDBClusterClient.from_DSN('udp+' + self.dsn_string)
948-
self.assertTrue(cli._client.use_udp)
949-
950-
cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string)
951-
self.assertEqual('https://host1:8086', cli._client._baseurl)
952-
953-
cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string,
954-
**{'ssl': False})
955-
self.assertEqual('http://host1:8086', cli._client._baseurl)
956-
957-
def test_dsn_password_caps(self):
958-
cli = InfluxDBClusterClient.from_DSN(
959-
'https+influxdb://usr:pWd@host:8086/db')
960-
self.assertEqual('pWd', cli._client._password)
961-
962-
def test_dsn_mixed_scheme_case(self):
963-
cli = InfluxDBClusterClient.from_DSN(
964-
'hTTps+inFLUxdb://usr:pWd@host:8086/db')
965-
self.assertEqual('pWd', cli._client._password)
966-
self.assertEqual('https://host:8086', cli._client._baseurl)
967-
968-
cli = InfluxDBClusterClient.from_DSN(
969-
'uDP+influxdb://usr:pwd@host1:8086,usr:pwd@host2:8086/db')
970-
self.assertTrue(cli._client.use_udp)

0 commit comments

Comments
 (0)