Skip to content
This repository was archived by the owner on Oct 29, 2024. It is now read-only.

Re-add cluster-client in legacy influxdb011 module #396

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ InfluxDB pre v1.1.0 users

InfluxDB 1.1.0 was released and it is the new recommended version. InfluxDB 0.8.x users may still use the legacy client by using ``from influxdb.influxdb08 import InfluxDBClient`` instead.

InfluxDB 0.11 users with cluster may still use the legacy cluster client by using ``from influxdb.influxdb011 import InfluxDBClusterClient`` instead.

Installation
============

Expand Down
3 changes: 3 additions & 0 deletions influxdb/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ def __init__(self,
'Accept': 'text/plain'
}

# _baseurl, _host and _port are properties to allow
# influxdb011.InfluxDBClusterClient to override them with
# thread-local variables
@property
def _baseurl(self):
return self._get_baseurl()
Expand Down
188 changes: 188 additions & 0 deletions influxdb/influxdb011.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
# -*- coding: utf-8 -*-
"""
Python cluster client for InfluxDB 0.11

WARNING: only use this code with a old InfluxDB 0.11 where all node are
equivalent.
Newer cluster using influxdb-relay, node are NOT requivalent and this code
should not be used or you will end up with inconsitancy.
"""

from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

from functools import wraps
import time
import threading
import random

from .client import InfluxDBClient, parse_dsn
from .exceptions import InfluxDBClientError
from .exceptions import InfluxDBServerError


class InfluxDBClusterClient(object):
"""The :class:`~.InfluxDBClusterClient` is the client for connecting
to a cluster of InfluxDB servers. Each query hits different host from the
list of hosts.

This works with old cluster using InfluxDB 0.11. NOT the newer cluster with
influxdb-relay.

:param hosts: all hosts to be included in the cluster, each of which
should be in the format (address, port),
e.g. [('127.0.0.1', 8086), ('127.0.0.1', 9096)]. Defaults to
[('localhost', 8086)]
:type hosts: list of tuples
:param shuffle: whether the queries should hit servers evenly(randomly),
defaults to True
:type shuffle: bool
:param client_base_class: the base class for the cluster client.
This parameter is used to enable the support of different client
types. Defaults to :class:`~.InfluxDBClient`
:param healing_delay: the delay in seconds, counting from last failure of
a server, before re-adding server to the list of working servers.
Defaults to 15 minutes (900 seconds)
"""

def __init__(self,
hosts=[('localhost', 8086)],
username='root',
password='root',
database=None,
ssl=False,
verify_ssl=False,
timeout=None,
use_udp=False,
udp_port=4444,
shuffle=True,
client_base_class=InfluxDBClient,
healing_delay=900,
):
self.clients = [self] # Keep it backwards compatible
self.hosts = hosts
self.bad_hosts = [] # Corresponding server has failures in history
self.shuffle = shuffle
self.healing_delay = healing_delay
self._last_healing = time.time()
host, port = self.hosts[0]
self._hosts_lock = threading.Lock()
self._thread_local = threading.local()
self._client = client_base_class(host=host,
port=port,
username=username,
password=password,
database=database,
ssl=ssl,
verify_ssl=verify_ssl,
timeout=timeout,
use_udp=use_udp,
udp_port=udp_port)
for method in dir(client_base_class):
orig_attr = getattr(client_base_class, method, '')
if method.startswith('_') or not callable(orig_attr):
continue

setattr(self, method, self._make_func(orig_attr))

self._client._get_host = self._get_host
self._client._get_port = self._get_port
self._client._get_baseurl = self._get_baseurl
self._update_client_host(self.hosts[0])

@staticmethod
def from_DSN(dsn, client_base_class=InfluxDBClient,
shuffle=True, **kwargs):
"""Same as :meth:`~.InfluxDBClient.from_DSN`, but supports
multiple servers.

:param shuffle: whether the queries should hit servers
evenly(randomly), defaults to True
:type shuffle: bool
:param client_base_class: the base class for all clients in the
cluster. This parameter is used to enable the support of
different client types. Defaults to :class:`~.InfluxDBClient`

:Example:

::

>> cluster = InfluxDBClusterClient.from_DSN('influxdb://usr:pwd\
@host1:8086,usr:pwd@host2:8086/db_name', timeout=5)
>> type(cluster)
<class 'influxdb.client.InfluxDBClusterClient'>
>> cluster.hosts
[('host1', 8086), ('host2', 8086)]
>> cluster._client
<influxdb.client.InfluxDBClient at 0x7feb438ec950>]
"""
init_args = parse_dsn(dsn)
init_args.update(**kwargs)
init_args['shuffle'] = shuffle
init_args['client_base_class'] = client_base_class
cluster_client = InfluxDBClusterClient(**init_args)
return cluster_client

def _update_client_host(self, host):
self._thread_local.host, self._thread_local.port = host
self._thread_local.baseurl = "{0}://{1}:{2}".format(
self._client._scheme,
self._client._host,
self._client._port
)

def _get_baseurl(self):
return self._thread_local.baseurl

def _get_host(self):
return self._thread_local.host

def _get_port(self):
return self._thread_local.port

def _make_func(self, orig_func):

@wraps(orig_func)
def func(*args, **kwargs):
now = time.time()
with self._hosts_lock:
if (self.bad_hosts and
self._last_healing + self.healing_delay < now):
h = self.bad_hosts.pop(0)
self.hosts.append(h)
self._last_healing = now

if self.shuffle:
random.shuffle(self.hosts)

hosts = self.hosts + self.bad_hosts

for h in hosts:
bad_host = False
try:
self._update_client_host(h)
return orig_func(self._client, *args, **kwargs)
except InfluxDBClientError as e:
# Errors caused by user's requests, re-raise
raise e
except ValueError as e:
raise e
except Exception as e:
# Errors that might caused by server failure, try another
bad_host = True
with self._hosts_lock:
if h in self.hosts:
self.hosts.remove(h)
self.bad_hosts.append(h)
self._last_healing = now
finally:
with self._hosts_lock:
if not bad_host and h in self.bad_hosts:
self.bad_hosts.remove(h)
self.hosts.append(h)

raise InfluxDBServerError("InfluxDB: no viable server!")

return func
148 changes: 148 additions & 0 deletions influxdb/tests/influxdb011_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# -*- coding: utf-8 -*-
"""
unit tests for the influxdb011.InfluxDBClusterClient.

NB/WARNING :
This module implements tests for the InfluxDBClusterClient class
but does so
+ without any server instance running
+ by mocking all the expected responses.

So any change of (response format from) the server will **NOT** be
detected by this module.

"""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import time
import warnings
import unittest

from influxdb.influxdb011 import InfluxDBClusterClient
from influxdb.client import InfluxDBServerError
from .client_test import FakeClient


class TestInfluxDBClusterClient(unittest.TestCase):

def setUp(self):
# By default, raise exceptions on warnings
warnings.simplefilter('error', FutureWarning)

self.hosts = [('host1', 8086), ('host2', 8086), ('host3', 8086)]
self.dsn_string = 'influxdb://uSr:pWd@host1:8086,uSr:pWd@host2:8086/db'

def test_init(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
username='username',
password='password',
database='database',
shuffle=False,
client_base_class=FakeClient)
self.assertEqual(3, len(cluster.hosts))
self.assertEqual(0, len(cluster.bad_hosts))
self.assertIn((cluster._client._host,
cluster._client._port), cluster.hosts)

def test_one_server_fails(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=False,
client_base_class=FakeClient)
self.assertEqual('Success', cluster.query('Fail once'))
self.assertEqual(2, len(cluster.hosts))
self.assertEqual(1, len(cluster.bad_hosts))

def test_two_servers_fail(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=False,
client_base_class=FakeClient)
self.assertEqual('Success', cluster.query('Fail twice'))
self.assertEqual(1, len(cluster.hosts))
self.assertEqual(2, len(cluster.bad_hosts))

def test_all_fail(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=True,
client_base_class=FakeClient)
with self.assertRaises(InfluxDBServerError):
cluster.query('Fail')
self.assertEqual(0, len(cluster.hosts))
self.assertEqual(3, len(cluster.bad_hosts))

def test_all_good(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=True,
client_base_class=FakeClient)
self.assertEqual('Success', cluster.query(''))
self.assertEqual(3, len(cluster.hosts))
self.assertEqual(0, len(cluster.bad_hosts))

def test_recovery(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=True,
client_base_class=FakeClient)
with self.assertRaises(InfluxDBServerError):
cluster.query('Fail')
self.assertEqual('Success', cluster.query(''))
self.assertEqual(1, len(cluster.hosts))
self.assertEqual(2, len(cluster.bad_hosts))

def test_healing(self):
cluster = InfluxDBClusterClient(hosts=self.hosts,
database='database',
shuffle=True,
healing_delay=1,
client_base_class=FakeClient)
with self.assertRaises(InfluxDBServerError):
cluster.query('Fail')
self.assertEqual('Success', cluster.query(''))
time.sleep(1.1)
self.assertEqual('Success', cluster.query(''))
self.assertEqual(2, len(cluster.hosts))
self.assertEqual(1, len(cluster.bad_hosts))
time.sleep(1.1)
self.assertEqual('Success', cluster.query(''))
self.assertEqual(3, len(cluster.hosts))
self.assertEqual(0, len(cluster.bad_hosts))

def test_dsn(self):
cli = InfluxDBClusterClient.from_DSN(self.dsn_string)
self.assertEqual([('host1', 8086), ('host2', 8086)], cli.hosts)
self.assertEqual('http://host1:8086', cli._client._baseurl)
self.assertEqual('uSr', cli._client._username)
self.assertEqual('pWd', cli._client._password)
self.assertEqual('db', cli._client._database)
self.assertFalse(cli._client.use_udp)

cli = InfluxDBClusterClient.from_DSN('udp+' + self.dsn_string)
self.assertTrue(cli._client.use_udp)

cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string)
self.assertEqual('https://host1:8086', cli._client._baseurl)

cli = InfluxDBClusterClient.from_DSN('https+' + self.dsn_string,
**{'ssl': False})
self.assertEqual('http://host1:8086', cli._client._baseurl)

def test_dsn_password_caps(self):
cli = InfluxDBClusterClient.from_DSN(
'https+influxdb://usr:pWd@host:8086/db')
self.assertEqual('pWd', cli._client._password)

def test_dsn_mixed_scheme_case(self):
cli = InfluxDBClusterClient.from_DSN(
'hTTps+inFLUxdb://usr:pWd@host:8086/db')
self.assertEqual('pWd', cli._client._password)
self.assertEqual('https://host:8086', cli._client._baseurl)

cli = InfluxDBClusterClient.from_DSN(
'uDP+influxdb://usr:pwd@host1:8086,usr:pwd@host2:8086/db')
self.assertTrue(cli._client.use_udp)