From 227bf1787f03980934ae909e53c2c008abdc613e Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Fri, 13 Dec 2019 12:35:52 +0800 Subject: [PATCH 1/2] init --- cluster_manage/.gitignore | 3 + cluster_manage/README.md | 26 + cluster_manage/check_lib.py | 13 + cluster_manage/conf.py | 21 + cluster_manage/etcd/__init__.py | 317 ++++++ cluster_manage/etcd/auth.py | 255 +++++ cluster_manage/etcd/client.py | 993 +++++++++++++++++++ cluster_manage/etcd/lock.py | 178 ++++ cluster_manage/flash_cluster_manager.py | 275 +++++ cluster_manage/flash_http_client.py | 47 + cluster_manage/flash_tools.py | 36 + cluster_manage/pd_client.py | 144 +++ cluster_manage/placement_rule.py | 50 + cluster_manage/release.sh | 52 + cluster_manage/tidb_tools.py | 57 ++ cluster_manage/tikv_util/.clang-format | 5 + cluster_manage/tikv_util/.gitignore | 2 + cluster_manage/tikv_util/__init__.py | 1 + cluster_manage/tikv_util/build.sh | 11 + cluster_manage/tikv_util/check_region_cnt.cc | 21 + cluster_manage/tikv_util/check_region_cnt.h | 20 + cluster_manage/tikv_util/codec.h | 181 ++++ cluster_manage/tikv_util/exception_util.h | 14 + cluster_manage/tikv_util/export.cc | 28 + cluster_manage/tikv_util/format.sh | 9 + cluster_manage/tikv_util/setup.py | 111 +++ cluster_manage/tikv_util/test.py | 63 ++ cluster_manage/tikv_util/tikv_key.cc | 66 ++ cluster_manage/tikv_util/tikv_key.h | 49 + cluster_manage/tikv_util/type.h | 9 + cluster_manage/timer.py | 53 + cluster_manage/util.py | 118 +++ 32 files changed, 3228 insertions(+) create mode 100644 cluster_manage/.gitignore create mode 100644 cluster_manage/README.md create mode 100644 cluster_manage/check_lib.py create mode 100644 cluster_manage/conf.py create mode 100644 cluster_manage/etcd/__init__.py create mode 100644 cluster_manage/etcd/auth.py create mode 100644 cluster_manage/etcd/client.py create mode 100644 cluster_manage/etcd/lock.py create mode 100644 cluster_manage/flash_cluster_manager.py create mode 100644 cluster_manage/flash_http_client.py create mode 100644 cluster_manage/flash_tools.py create mode 100644 cluster_manage/pd_client.py create mode 100644 cluster_manage/placement_rule.py create mode 100755 cluster_manage/release.sh create mode 100644 cluster_manage/tidb_tools.py create mode 100644 cluster_manage/tikv_util/.clang-format create mode 100644 cluster_manage/tikv_util/.gitignore create mode 100644 cluster_manage/tikv_util/__init__.py create mode 100755 cluster_manage/tikv_util/build.sh create mode 100644 cluster_manage/tikv_util/check_region_cnt.cc create mode 100644 cluster_manage/tikv_util/check_region_cnt.h create mode 100644 cluster_manage/tikv_util/codec.h create mode 100644 cluster_manage/tikv_util/exception_util.h create mode 100644 cluster_manage/tikv_util/export.cc create mode 100755 cluster_manage/tikv_util/format.sh create mode 100644 cluster_manage/tikv_util/setup.py create mode 100644 cluster_manage/tikv_util/test.py create mode 100644 cluster_manage/tikv_util/tikv_key.cc create mode 100644 cluster_manage/tikv_util/tikv_key.h create mode 100644 cluster_manage/tikv_util/type.h create mode 100644 cluster_manage/timer.py create mode 100644 cluster_manage/util.py diff --git a/cluster_manage/.gitignore b/cluster_manage/.gitignore new file mode 100644 index 00000000000..35716b244e3 --- /dev/null +++ b/cluster_manage/.gitignore @@ -0,0 +1,3 @@ +/dist/ +*.spec +version.py diff --git a/cluster_manage/README.md b/cluster_manage/README.md new file mode 100644 index 00000000000..672cd97dc54 --- /dev/null +++ b/cluster_manage/README.md @@ -0,0 +1,26 @@ +# TiFlash Cluster Manage + +- It works as `Cluster Manage` for `TiFlash` to interact with `TiDB` / `PD` / `TiKV`, run with each flash node. +A master will be elected by using `etcd` to manage the whole flash cluster. Periodically scan tables with column +replicas configuration in tidb and try to build flash engine peers or monitor their status. +- language: `Python` `C/C++` + +## Prepare + +* install `python3.7`, `pybind11`, `pyinstaller`, `clang-format`, `dnspython`, `uri`, `requests`, `urllib3` +, `toml`, `dnspython`, `C++17` +, `setuptools` + +* use `release.sh` and get dir `dist/flash_cluster_manager/`. + +## Run + +* show version +``` +./dist/flash_cluster_manager/flash_cluster_manager -v +``` + +* run cmd: +``` +./dist/flash_cluster_manager/flash_cluster_manager --config [path-to-flash-config.toml] +``` \ No newline at end of file diff --git a/cluster_manage/check_lib.py b/cluster_manage/check_lib.py new file mode 100644 index 00000000000..3b90c363031 --- /dev/null +++ b/cluster_manage/check_lib.py @@ -0,0 +1,13 @@ +if __name__ == '__main__': + try: + import dns + import requests + import uri + import urllib3 + import toml + import pybind11 + import setuptools + except Exception as e: + print(e) + exit(-1) + exit(0) diff --git a/cluster_manage/conf.py b/cluster_manage/conf.py new file mode 100644 index 00000000000..5f705dcc4fa --- /dev/null +++ b/cluster_manage/conf.py @@ -0,0 +1,21 @@ +#!/usr/bin/python3 +import logging +import argparse +import version + +version_info = '' +for d in dir(version): + if not d.startswith('__'): + version_info += '{}: {}\n'.format(d, getattr(version, d)) + +parser = argparse.ArgumentParser(description='TiFlash Cluster Manager', formatter_class=argparse.RawTextHelpFormatter) +parser.add_argument('--version', '-v', help='show version', action='version', version=version_info) +parser.add_argument('--config', help='path of config file *.toml', required=True) +parser.add_argument('--log_level', help='log level', default='INFO', choices=['INFO', 'DEBUG', 'WARN']) + +args = parser.parse_args() + +import flash_tools + +flash_conf = flash_tools.FlashConfig(args.config) +log_level = logging._nameToLevel.get(args.log_level) diff --git a/cluster_manage/etcd/__init__.py b/cluster_manage/etcd/__init__.py new file mode 100644 index 00000000000..33b1d679491 --- /dev/null +++ b/cluster_manage/etcd/__init__.py @@ -0,0 +1,317 @@ +import logging +from .client import Client +from .lock import Lock + +_log = logging.getLogger(__name__) + +# Prevent "no handler" warnings to stderr in projects that do not configure +# logging. +try: + from logging import NullHandler +except ImportError: + # Python <2.7, just define it. + class NullHandler(logging.Handler): + def emit(self, record): + pass +_log.addHandler(NullHandler()) + + +class EtcdResult(object): + _node_props = { + 'key': None, + 'value': None, + 'expiration': None, + 'ttl': None, + 'modifiedIndex': None, + 'createdIndex': None, + 'newKey': False, + 'dir': False, + } + + def __init__(self, action=None, node=None, prevNode=None, **kwdargs): + """ + Creates an EtcdResult object. + + Args: + action (str): The action that resulted in key creation + + node (dict): The dictionary containing all node information. + + prevNode (dict): The dictionary containing previous node information. + + """ + self.action = action + for (key, default) in self._node_props.items(): + if key in node: + setattr(self, key, node[key]) + else: + setattr(self, key, default) + + self._children = [] + if self.dir and 'nodes' in node: + # We keep the data in raw format, converting them only when needed + self._children = node['nodes'] + + if prevNode: + self._prev_node = EtcdResult(None, node=prevNode) + # See issue 38: when returning a write() op etcd has a bogus result. + if self._prev_node.dir and not self.dir: + self.dir = True + + def parse_headers(self, response): + headers = response.getheaders() + self.etcd_index = int(headers.get('x-etcd-index', 1)) + self.raft_index = int(headers.get('x-raft-index', 1)) + + def get_subtree(self, leaves_only=False): + """ + Get all the subtree resulting from a recursive=true call to etcd. + + Args: + leaves_only (bool): if true, only value nodes are returned + + + """ + if not self._children: + #if the current result is a leaf, return itself + yield self + return + else: + # node is not a leaf + if not leaves_only: + yield self + for n in self._children: + node = EtcdResult(None, n) + for child in node.get_subtree(leaves_only=leaves_only): + yield child + return + + @property + def leaves(self): + return self.get_subtree(leaves_only=True) + + @property + def children(self): + """ Deprecated, use EtcdResult.leaves instead """ + return self.leaves + + def __eq__(self, other): + if not (type(self) is type(other)): + return False + for k in self._node_props.keys(): + try: + a = getattr(self, k) + b = getattr(other, k) + if a != b: + return False + except: + return False + return True + + def __ne__(self, other): + return not self.__eq__(other) + + def __repr__(self): + return "%s(%r)" % (self.__class__, self.__dict__) + + +class EtcdException(Exception): + + """ + Generic Etcd Exception. + """ + def __init__(self, message=None, payload=None): + super(EtcdException, self).__init__(message) + self.payload = payload + + +class EtcdValueError(EtcdException, ValueError): + """ + Base class for Etcd value-related errors. + """ + pass + + +class EtcdCompareFailed(EtcdValueError): + """ + Compare-and-swap failure + """ + pass + + +class EtcdClusterIdChanged(EtcdException): + """ + The etcd cluster ID changed. This may indicate the cluster was replaced + with a backup. Raised to prevent waiting on an etcd_index that was only + valid on the old cluster. + """ + pass + + +class EtcdKeyError(EtcdException): + """ + Etcd Generic KeyError Exception + """ + pass + + +class EtcdKeyNotFound(EtcdKeyError): + """ + Etcd key not found exception (100) + """ + pass + + +class EtcdNotFile(EtcdKeyError): + """ + Etcd not a file exception (102) + """ + pass + + +class EtcdNotDir(EtcdKeyError): + """ + Etcd not a directory exception (104) + """ + pass + + +class EtcdAlreadyExist(EtcdKeyError): + """ + Etcd already exist exception (105) + """ + pass + + +class EtcdEventIndexCleared(EtcdException): + """ + Etcd event index is outdated and cleared exception (401) + """ + pass + + +class EtcdConnectionFailed(EtcdException): + """ + Connection to etcd failed. + """ + def __init__(self, message=None, payload=None, cause=None): + super(EtcdConnectionFailed, self).__init__(message=message, + payload=payload) + self.cause = cause + + +class EtcdInsufficientPermissions(EtcdException): + """ + Request failed because of insufficient permissions. + """ + pass + + +class EtcdWatchTimedOut(EtcdConnectionFailed): + """ + A watch timed out without returning a result. + """ + pass + + +class EtcdWatcherCleared(EtcdException): + """ + Watcher is cleared due to etcd recovery. + """ + pass + + +class EtcdLeaderElectionInProgress(EtcdException): + """ + Request failed due to in-progress leader election. + """ + pass + + +class EtcdRootReadOnly(EtcdKeyError): + """ + Operation is not valid on the root, which is read only. + """ + pass + + +class EtcdDirNotEmpty(EtcdValueError): + """ + Directory not empty. + """ + pass + + +class EtcdLockExpired(EtcdException): + """ + Our lock apparently expired while we were trying to acquire it. + """ + pass + + +class EtcdError(object): + # See https://github.com/coreos/etcd/blob/master/Documentation/v2/errorcode.md + error_exceptions = { + 100: EtcdKeyNotFound, + 101: EtcdCompareFailed, + 102: EtcdNotFile, + # 103: Non-public: no more peers. + 104: EtcdNotDir, + 105: EtcdAlreadyExist, + # 106: Non-public: key is preserved. + 107: EtcdRootReadOnly, + 108: EtcdDirNotEmpty, + # 109: Non-public: existing peer addr. + 110: EtcdInsufficientPermissions, + + 200: EtcdValueError, # Not part of v2 + 201: EtcdValueError, + 202: EtcdValueError, + 203: EtcdValueError, + 204: EtcdValueError, + 205: EtcdValueError, + 206: EtcdValueError, + 207: EtcdValueError, + 208: EtcdValueError, + 209: EtcdValueError, + 210: EtcdValueError, + + # 300: Non-public: Raft internal error. + 301: EtcdLeaderElectionInProgress, + + 400: EtcdWatcherCleared, + 401: EtcdEventIndexCleared, + } + + @classmethod + def handle(cls, payload): + """ + Decodes the error and throws the appropriate error message + + :param payload: The decoded JSON error payload as a dict. + """ + error_code = payload.get("errorCode") + message = payload.get("message") + cause = payload.get("cause") + msg = '{} : {}'.format(message, cause) + status = payload.get("status") + # Some general status handling, as + # not all endpoints return coherent error messages + if status == 404: + error_code = 100 + elif status == 401: + error_code = 110 + exc = cls.error_exceptions.get(error_code, EtcdException) + if issubclass(exc, EtcdException): + raise exc(msg, payload) + else: + raise exc(msg) + + +# Attempt to enable urllib3's SNI support, if possible +# Blatantly copied from requests. +try: + from urllib3.contrib import pyopenssl + pyopenssl.inject_into_urllib3() +except ImportError: + pass diff --git a/cluster_manage/etcd/auth.py b/cluster_manage/etcd/auth.py new file mode 100644 index 00000000000..796772d73fd --- /dev/null +++ b/cluster_manage/etcd/auth.py @@ -0,0 +1,255 @@ +import json + +import logging +import etcd + +_log = logging.getLogger(__name__) + + +class EtcdAuthBase(object): + entity = 'example' + + def __init__(self, client, name): + self.client = client + self.name = name + self.uri = "{}/auth/{}s/{}".format(self.client.version_prefix, + self.entity, self.name) + + @property + def names(self): + key = "{}s".format(self.entity) + uri = "{}/auth/{}".format(self.client.version_prefix, key) + response = self.client.api_execute(uri, self.client._MGET) + return json.loads(response.data.decode('utf-8'))[key] + + def read(self): + try: + response = self.client.api_execute(self.uri, self.client._MGET) + except etcd.EtcdInsufficientPermissions as e: + _log.error("Any action on the authorization requires the root role") + raise + except etcd.EtcdKeyNotFound: + _log.info("%s '%s' not found", self.entity, self.name) + raise + except Exception as e: + _log.error("Failed to fetch %s in %s%s: %r", + self.entity, self.client._base_uri, + self.client.version_prefix, e) + raise etcd.EtcdException( + "Could not fetch {} '{}'".format(self.entity, self.name)) + + self._from_net(response.data) + + def write(self): + try: + r = self.__class__(self.client, self.name) + r.read() + except etcd.EtcdKeyNotFound: + r = None + try: + for payload in self._to_net(r): + response = self.client.api_execute_json(self.uri, + self.client._MPUT, + params=payload) + # This will fail if the response is an error + self._from_net(response.data) + except etcd.EtcdInsufficientPermissions as e: + _log.error("Any action on the authorization requires the root role") + raise + except Exception as e: + _log.error("Failed to write %s '%s'", self.entity, self.name) + # TODO: fine-grained exception handling + raise etcd.EtcdException( + "Could not write {} '{}': {}".format(self.entity, + self.name, e)) + + def delete(self): + try: + _ = self.client.api_execute(self.uri, self.client._MDELETE) + except etcd.EtcdInsufficientPermissions as e: + _log.error("Any action on the authorization requires the root role") + raise + except etcd.EtcdKeyNotFound: + _log.info("%s '%s' not found", self.entity, self.name) + raise + except Exception as e: + _log.error("Failed to delete %s in %s%s: %r", + self.entity, self._base_uri, self.version_prefix, e) + raise etcd.EtcdException( + "Could not delete {} '{}'".format(self.entity, self.name)) + + def _from_net(self, data): + raise NotImplementedError() + + def _to_net(self, old=None): + raise NotImplementedError() + + @classmethod + def new(cls, client, data): + c = cls(client, data[cls.entity]) + c._from_net(data) + return c + + +class EtcdUser(EtcdAuthBase): + """Class to manage in a orm-like way etcd users""" + entity = 'user' + + def __init__(self, client, name): + super(EtcdUser, self).__init__(client, name) + self._roles = set() + self._password = None + + def _from_net(self, data): + d = json.loads(data.decode('utf-8')) + self.roles = d.get('roles', []) + self.name = d.get('user') + + def _to_net(self, prevobj=None): + if prevobj is None: + retval = [{"user": self.name, "password": self._password, + "roles": list(self.roles)}] + else: + retval = [] + if self._password: + retval.append({"user": self.name, "password": self._password}) + to_grant = list(self.roles - prevobj.roles) + to_revoke = list(prevobj.roles - self.roles) + if to_grant: + retval.append({"user": self.name, "grant": to_grant}) + if to_revoke: + retval.append({"user": self.name, "revoke": to_revoke}) + # Let's blank the password now + # Even if the user can't be written we don't want it to leak anymore. + self._password = None + return retval + + @property + def roles(self): + return self._roles + + @roles.setter + def roles(self, val): + self._roles = set(val) + + @property + def password(self): + """Empty property for password.""" + return None + + @password.setter + def password(self, new_password): + """Change user's password.""" + self._password = new_password + + def __str__(self): + return json.dumps(self._to_net()[0]) + + + +class EtcdRole(EtcdAuthBase): + entity = 'role' + + def __init__(self, client, name): + super(EtcdRole, self).__init__(client, name) + self._read_paths = set() + self._write_paths = set() + + def _from_net(self, data): + d = json.loads(data.decode('utf-8')) + self.name = d.get('role') + + try: + kv = d["permissions"]["kv"] + except: + self._read_paths = set() + self._write_paths = set() + return + + self._read_paths = set(kv.get('read', [])) + self._write_paths = set(kv.get('write', [])) + + def _to_net(self, prevobj=None): + retval = [] + if prevobj is None: + retval.append({ + "role": self.name, + "permissions": + { + "kv": + { + "read": list(self._read_paths), + "write": list(self._write_paths) + } + } + }) + else: + to_grant = { + 'read': list(self._read_paths - prevobj._read_paths), + 'write': list(self._write_paths - prevobj._write_paths) + } + to_revoke = { + 'read': list(prevobj._read_paths - self._read_paths), + 'write': list(prevobj._write_paths - self._write_paths) + } + if [path for sublist in to_revoke.values() for path in sublist]: + retval.append({'role': self.name, 'revoke': {'kv': to_revoke}}) + if [path for sublist in to_grant.values() for path in sublist]: + retval.append({'role': self.name, 'grant': {'kv': to_grant}}) + return retval + + def grant(self, path, permission): + if permission.upper().find('R') >= 0: + self._read_paths.add(path) + if permission.upper().find('W') >= 0: + self._write_paths.add(path) + + def revoke(self, path, permission): + if permission.upper().find('R') >= 0 and \ + path in self._read_paths: + self._read_paths.remove(path) + if permission.upper().find('W') >= 0 and \ + path in self._write_paths: + self._write_paths.remove(path) + + @property + def acls(self): + perms = {} + try: + for path in self._read_paths: + perms[path] = 'R' + for path in self._write_paths: + if path in perms: + perms[path] += 'W' + else: + perms[path] = 'W' + except: + pass + return perms + + @acls.setter + def acls(self, acls): + self._read_paths = set() + self._write_paths = set() + for path, permission in acls.items(): + self.grant(path, permission) + + def __str__(self): + return json.dumps({"role": self.name, 'acls': self.acls}) + + +class Auth(object): + def __init__(self, client): + self.client = client + self.uri = "{}/auth/enable".format(self.client.version_prefix) + + @property + def active(self): + resp = self.client.api_execute(self.uri, self.client._MGET) + return json.loads(resp.data.decode('utf-8'))['enabled'] + + @active.setter + def active(self, value): + if value != self.active: + method = value and self.client._MPUT or self.client._MDELETE + self.client.api_execute(self.uri, method) diff --git a/cluster_manage/etcd/client.py b/cluster_manage/etcd/client.py new file mode 100644 index 00000000000..74ad8fc1b5b --- /dev/null +++ b/cluster_manage/etcd/client.py @@ -0,0 +1,993 @@ +""" +.. module:: python-etcd + :synopsis: A python etcd client. + +.. moduleauthor:: Jose Plana + + +""" +import logging +try: + # Python 3 + from http.client import HTTPException +except ImportError: + # Python 2 + from httplib import HTTPException +import socket +import urllib3 +from urllib3.exceptions import HTTPError +from urllib3.exceptions import ReadTimeoutError +import json +import ssl +import dns.resolver +from functools import wraps +import etcd + +try: + from urlparse import urlparse +except ImportError: + from urllib.parse import urlparse + + +_log = logging.getLogger(__name__) + + +class Client(object): + + """ + Client for etcd, the distributed log service using raft. + """ + + _MGET = 'GET' + _MPUT = 'PUT' + _MPOST = 'POST' + _MDELETE = 'DELETE' + _comparison_conditions = set(('prevValue', 'prevIndex', 'prevExist', 'refresh')) + _read_options = set(('recursive', 'wait', 'waitIndex', 'sorted', 'quorum')) + _del_conditions = set(('prevValue', 'prevIndex')) + + http = None + + def __init__( + self, + host='127.0.0.1', + port=4001, + srv_domain=None, + version_prefix='/v2', + read_timeout=60, + allow_redirect=True, + protocol='http', + cert=None, + ca_cert=None, + username=None, + password=None, + allow_reconnect=False, + use_proxies=False, + expected_cluster_id=None, + per_host_pool_size=10, + lock_prefix="/_locks" + ): + """ + Initialize the client. + + Args: + host (mixed): + If a string, IP to connect to. + If a tuple ((host, port), (host, port), ...) + + port (int): Port used to connect to etcd. + + srv_domain (str): Domain to search the SRV record for cluster autodiscovery. + + version_prefix (str): Url or version prefix in etcd url (default=/v2). + + read_timeout (int): max seconds to wait for a read. + + allow_redirect (bool): allow the client to connect to other nodes. + + protocol (str): Protocol used to connect to etcd. + + cert (mixed): If a string, the whole ssl client certificate; + if a tuple, the cert and key file names. + + ca_cert (str): The ca certificate. If pressent it will enable + validation. + + username (str): username for etcd authentication. + + password (str): password for etcd authentication. + + allow_reconnect (bool): allow the client to reconnect to another + etcd server in the cluster in the case the + default one does not respond. + + use_proxies (bool): we are using a list of proxies to which we connect, + and don't want to connect to the original etcd cluster. + + expected_cluster_id (str): If a string, recorded as the expected + UUID of the cluster (rather than + learning it from the first request), + reads will raise EtcdClusterIdChanged + if they receive a response with a + different cluster ID. + per_host_pool_size (int): specifies maximum number of connections to pool + by host. By default this will use up to 10 + connections. + lock_prefix (str): Set the key prefix at etcd when client to lock object. + By default this will be use /_locks. + """ + + # If a DNS record is provided, use it to get the hosts list + if srv_domain is not None: + try: + host = self._discover(srv_domain) + except Exception as e: + _log.error("Could not discover the etcd hosts from %s: %s", + srv_domain, e) + + self._protocol = protocol + + def uri(protocol, host, port): + return '%s://%s:%d' % (protocol, host, port) + + if not isinstance(host, tuple): + self._machines_cache = [] + self._base_uri = uri(self._protocol, host, port) + else: + if not allow_reconnect: + _log.error("List of hosts incompatible with allow_reconnect.") + raise etcd.EtcdException("A list of hosts to connect to was given, but reconnection not allowed?") + self._machines_cache = [uri(self._protocol, *conn) for conn in host] + self._base_uri = self._machines_cache.pop(0) + + self.expected_cluster_id = expected_cluster_id + self.version_prefix = version_prefix + + self._read_timeout = read_timeout + self._allow_redirect = allow_redirect + self._use_proxies = use_proxies + self._allow_reconnect = allow_reconnect + self._lock_prefix = lock_prefix + + # SSL Client certificate support + + kw = { + 'maxsize': per_host_pool_size + } + + if self._read_timeout > 0: + kw['timeout'] = self._read_timeout + + if cert: + if isinstance(cert, tuple): + # Key and cert are separate + kw['cert_file'] = cert[0] + kw['key_file'] = cert[1] + else: + # combined certificate + kw['cert_file'] = cert + + if ca_cert: + kw['ca_certs'] = ca_cert + kw['cert_reqs'] = ssl.CERT_REQUIRED + + self.username = None + self.password = None + if username and password: + self.username = username + self.password = password + elif username: + _log.warning('Username provided without password, both are required for authentication') + elif password: + _log.warning('Password provided without username, both are required for authentication') + + self.http = urllib3.PoolManager(num_pools=10, **kw) + + _log.debug("New etcd client created for %s", self.base_uri) + + if self._allow_reconnect: + # we need the set of servers in the cluster in order to try + # reconnecting upon error. The cluster members will be + # added to the hosts list you provided. If you are using + # proxies, set all + # + # Beware though: if you input '127.0.0.1' as your host and + # etcd advertises 'localhost', both will be in the + # resulting list. + + # If we're connecting to the original cluster, we can + # extend the list given to the client with what we get + # from self.machines + if not self._use_proxies: + self._machines_cache = list(set(self._machines_cache) | + set(self.machines)) + if self._base_uri in self._machines_cache: + self._machines_cache.remove(self._base_uri) + _log.debug("Machines cache initialised to %s", + self._machines_cache) + + # Versions set to None. They will be set upon first usage. + self._version = self._cluster_version = None + + def _set_version_info(self): + """ + Sets the version information provided by the server. + """ + # Set the version + version_info = json.loads(self.http.request( + self._MGET, + self._base_uri + '/version', + headers=self._get_headers(), + timeout=self.read_timeout, + redirect=self.allow_redirect).data.decode('utf-8')) + self._version = version_info['etcdserver'] + self._cluster_version = version_info['etcdcluster'] + + def _discover(self, domain): + srv_name = "_etcd._tcp.{}".format(domain) + answers = dns.resolver.query(srv_name, 'SRV') + hosts = [] + for answer in answers: + hosts.append( + (answer.target.to_text(omit_final_dot=True), answer.port)) + _log.debug("Found %s", hosts) + if not len(hosts): + raise ValueError("The SRV record is present but no host were found") + return tuple(hosts) + + def __del__(self): + """Clean up open connections""" + if self.http is not None: + try: + self.http.clear() + except ReferenceError: + # this may hit an already-cleared weakref + pass + + @property + def base_uri(self): + """URI used by the client to connect to etcd.""" + return self._base_uri + + @property + def host(self): + """Node to connect etcd.""" + return urlparse(self._base_uri).netloc.split(':')[0] + + @property + def port(self): + """Port to connect etcd.""" + return int(urlparse(self._base_uri).netloc.split(':')[1]) + + @property + def protocol(self): + """Protocol used to connect etcd.""" + return self._protocol + + @property + def read_timeout(self): + """Max seconds to wait for a read.""" + return self._read_timeout + + @property + def allow_redirect(self): + """Allow the client to connect to other nodes.""" + return self._allow_redirect + + @property + def lock_prefix(self): + """Get the key prefix at etcd when client to lock object.""" + return self._lock_prefix + + @property + def machines(self): + """ + Members of the cluster. + + Returns: + list. str with all the nodes in the cluster. + + >>> print client.machines + ['http://127.0.0.1:4001', 'http://127.0.0.1:4002'] + """ + # We can't use api_execute here, or it causes a logical loop + try: + uri = self._base_uri + self.version_prefix + '/machines' + response = self.http.request( + self._MGET, + uri, + headers=self._get_headers(), + timeout=self.read_timeout, + redirect=self.allow_redirect) + + machines = [ + node.strip() for node in + self._handle_server_response(response).data.decode('utf-8').split(',') + ] + _log.debug("Retrieved list of machines: %s", machines) + return machines + except (HTTPError, HTTPException, socket.error) as e: + # We can't get the list of machines, if one server is in the + # machines cache, try on it + _log.error("Failed to get list of machines from %s%s: %r", + self._base_uri, self.version_prefix, e) + if self._machines_cache: + self._base_uri = self._machines_cache.pop(0) + _log.info("Retrying on %s", self._base_uri) + # Call myself + return self.machines + else: + raise etcd.EtcdException("Could not get the list of servers, " + "maybe you provided the wrong " + "host(s) to connect to?") + + @property + def members(self): + """ + A more structured view of peers in the cluster. + + Note that while we have an internal DS called _members, accessing the public property will call etcd. + """ + # Empty the members list + self._members = {} + try: + data = self.api_execute(self.version_prefix + '/members', + self._MGET).data.decode('utf-8') + res = json.loads(data) + for member in res['members']: + self._members[member['id']] = member + return self._members + except: + raise etcd.EtcdException("Could not get the members list, maybe the cluster has gone away?") + + @property + def leader(self): + """ + Returns: + dict. the leader of the cluster. + + >>> print client.leader + {"id":"ce2a822cea30bfca","name":"default","peerURLs":["http://localhost:2380","http://localhost:7001"],"clientURLs":["http://127.0.0.1:4001"]} + """ + try: + + leader = json.loads( + self.api_execute(self.version_prefix + '/stats/self', + self._MGET).data.decode('utf-8')) + return self.members[leader['leaderInfo']['leader']] + except Exception as e: + raise etcd.EtcdException("Cannot get leader data: %s" % e) + + @property + def stats(self): + """ + Returns: + dict. the stats of the local server + """ + return self._stats() + + @property + def leader_stats(self): + """ + Returns: + dict. the stats of the leader + """ + return self._stats('leader') + + @property + def store_stats(self): + """ + Returns: + dict. the stats of the kv store + """ + return self._stats('store') + + def _stats(self, what='self'): + """ Internal method to access the stats endpoints""" + data = self.api_execute(self.version_prefix + + '/stats/' + what, self._MGET).data.decode('utf-8') + try: + return json.loads(data) + except (TypeError,ValueError): + raise etcd.EtcdException("Cannot parse json data in the response") + + @property + def version(self): + """ + Version of etcd. + """ + if not self._version: + self._set_version_info() + return self._version + + @property + def cluster_version(self): + """ + Version of the etcd cluster. + """ + if not self._cluster_version: + self._set_version_info() + + return self._cluster_version + + @property + def key_endpoint(self): + """ + REST key endpoint. + """ + return self.version_prefix + '/keys' + + def __contains__(self, key): + """ + Check if a key is available in the cluster. + + >>> print 'key' in client + True + """ + try: + self.get(key) + return True + except etcd.EtcdKeyNotFound: + return False + + def _sanitize_key(self, key): + if not key.startswith('/'): + key = "/{}".format(key) + return key + + def write(self, key, value, ttl=None, dir=False, append=False, **kwdargs): + """ + Writes the value for a key, possibly doing atomic Compare-and-Swap + + Args: + key (str): Key. + + value (object): value to set + + ttl (int): Time in seconds of expiration (optional). + + dir (bool): Set to true if we are writing a directory; default is false. + + append (bool): If true, it will post to append the new value to the dir, creating a sequential key. Defaults to false. + + Other parameters modifying the write method are accepted: + + + prevValue (str): compare key to this value, and swap only if corresponding (optional). + + prevIndex (int): modify key only if actual modifiedIndex matches the provided one (optional). + + prevExist (bool): If false, only create key; if true, only update key. + + refresh (bool): since 2.3.0, If true, only update the ttl, prev key must existed(prevExist=True). + + Returns: + client.EtcdResult + + >>> print client.write('/key', 'newValue', ttl=60, prevExist=False).value + 'newValue' + + """ + _log.debug("Writing %s to key %s ttl=%s dir=%s append=%s", + value, key, ttl, dir, append) + key = self._sanitize_key(key) + params = {} + if value is not None: + params['value'] = value + + if ttl is not None: + params['ttl'] = ttl + + if dir: + if value: + raise etcd.EtcdException( + 'Cannot create a directory with a value') + params['dir'] = "true" + + for (k, v) in kwdargs.items(): + if k in self._comparison_conditions: + if type(v) == bool: + params[k] = v and "true" or "false" + else: + params[k] = v + + method = append and self._MPOST or self._MPUT + if '_endpoint' in kwdargs: + path = kwdargs['_endpoint'] + key + else: + path = self.key_endpoint + key + + response = self.api_execute(path, method, params=params) + return self._result_from_response(response) + + def refresh(self, key, ttl, **kwdargs): + """ + (Since 2.3.0) Refresh the ttl of a key without notifying watchers. + + Keys in etcd can be refreshed without notifying watchers, + this can be achieved by setting the refresh to true when updating a TTL + + You cannot update the value of a key when refreshing it + + @see: https://github.com/coreos/etcd/blob/release-2.3/Documentation/api.md#refreshing-key-ttl + + Args: + key (str): Key. + + ttl (int): Time in seconds of expiration (optional). + + Other parameters modifying the write method are accepted as `EtcdClient.write`. + """ + # overwrite kwdargs' prevExist + kwdargs['prevExist'] = True + return self.write(key=key, value=None, ttl=ttl, refresh=True, **kwdargs) + + def update(self, obj): + """ + Updates the value for a key atomically. Typical usage would be: + + c = etcd.Client() + o = c.read("/somekey") + o.value += 1 + c.update(o) + + Args: + obj (etcd.EtcdResult): The object that needs updating. + + """ + _log.debug("Updating %s to %s.", obj.key, obj.value) + kwdargs = { + 'dir': obj.dir, + 'ttl': obj.ttl, + 'prevExist': True + } + + if not obj.dir: + # prevIndex on a dir causes a 'not a file' error. d'oh! + kwdargs['prevIndex'] = obj.modifiedIndex + return self.write(obj.key, obj.value, **kwdargs) + + def read(self, key, **kwdargs): + """ + Returns the value of the key 'key'. + + Args: + key (str): Key. + + Recognized kwd args + + recursive (bool): If you should fetch recursively a dir + + wait (bool): If we should wait and return next time the key is changed + + waitIndex (int): The index to fetch results from. + + sorted (bool): Sort the output keys (alphanumerically) + + timeout (int): max seconds to wait for a read. + + Returns: + client.EtcdResult (or an array of client.EtcdResult if a + subtree is queried) + + Raises: + KeyValue: If the key doesn't exists. + + urllib3.exceptions.TimeoutError: If timeout is reached. + + >>> print client.get('/key').value + 'value' + + """ + _log.debug("Issuing read for key %s with args %s", key, kwdargs) + key = self._sanitize_key(key) + + params = {} + for (k, v) in kwdargs.items(): + if k in self._read_options: + if type(v) == bool: + params[k] = v and "true" or "false" + elif v is not None: + params[k] = v + + timeout = kwdargs.get('timeout', None) + + response = self.api_execute( + self.key_endpoint + key, self._MGET, params=params, + timeout=timeout) + return self._result_from_response(response) + + def delete(self, key, recursive=None, dir=None, **kwdargs): + """ + Removed a key from etcd. + + Args: + + key (str): Key. + + recursive (bool): if we want to recursively delete a directory, set + it to true + + dir (bool): if we want to delete a directory, set it to true + + prevValue (str): compare key to this value, and swap only if + corresponding (optional). + + prevIndex (int): modify key only if actual modifiedIndex matches the + provided one (optional). + + Returns: + client.EtcdResult + + Raises: + KeyValue: If the key doesn't exists. + + >>> print client.delete('/key').key + '/key' + + """ + _log.debug("Deleting %s recursive=%s dir=%s extra args=%s", + key, recursive, dir, kwdargs) + key = self._sanitize_key(key) + + kwds = {} + if recursive is not None: + kwds['recursive'] = recursive and "true" or "false" + if dir is not None: + kwds['dir'] = dir and "true" or "false" + + for k in self._del_conditions: + if k in kwdargs: + kwds[k] = kwdargs[k] + _log.debug("Calculated params = %s", kwds) + + response = self.api_execute( + self.key_endpoint + key, self._MDELETE, params=kwds) + return self._result_from_response(response) + + def pop(self, key, recursive=None, dir=None, **kwdargs): + """ + Remove specified key from etcd and return the corresponding value. + + Args: + + key (str): Key. + + recursive (bool): if we want to recursively delete a directory, set + it to true + + dir (bool): if we want to delete a directory, set it to true + + prevValue (str): compare key to this value, and swap only if + corresponding (optional). + + prevIndex (int): modify key only if actual modifiedIndex matches the + provided one (optional). + + Returns: + client.EtcdResult + + Raises: + KeyValue: If the key doesn't exists. + + >>> print client.pop('/key').value + 'value' + + """ + return self.delete(key=key, recursive=recursive, dir=dir, **kwdargs)._prev_node + + # Higher-level methods on top of the basic primitives + def test_and_set(self, key, value, prev_value, ttl=None): + """ + Atomic test & set operation. + It will check if the value of 'key' is 'prev_value', + if the the check is correct will change the value for 'key' to 'value' + if the the check is false an exception will be raised. + + Args: + key (str): Key. + value (object): value to set + prev_value (object): previous value. + ttl (int): Time in seconds of expiration (optional). + + Returns: + client.EtcdResult + + Raises: + ValueError: When the 'prev_value' is not the current value. + + >>> print client.test_and_set('/key', 'new', 'old', ttl=60).value + 'new' + + """ + return self.write(key, value, prevValue=prev_value, ttl=ttl) + + def set(self, key, value, ttl=None): + """ + Compatibility: sets the value of the key 'key' to the value 'value' + + Args: + key (str): Key. + value (object): value to set + ttl (int): Time in seconds of expiration (optional). + + Returns: + client.EtcdResult + + Raises: + etcd.EtcdException: when something weird goes wrong. + + """ + return self.write(key, value, ttl=ttl) + + def get(self, key): + """ + Returns the value of the key 'key'. + + Args: + key (str): Key. + + Returns: + client.EtcdResult + + Raises: + KeyError: If the key doesn't exists. + + >>> print client.get('/key').value + 'value' + + """ + return self.read(key) + + def watch(self, key, index=None, timeout=None, recursive=None): + """ + Blocks until a new event has been received, starting at index 'index' + + Args: + key (str): Key. + + index (int): Index to start from. + + timeout (int): max seconds to wait for a read. + + Returns: + client.EtcdResult + + Raises: + KeyValue: If the key doesn't exist. + + etcd.EtcdWatchTimedOut: If timeout is reached. + + >>> print client.watch('/key').value + 'value' + + """ + _log.debug("About to wait on key %s, index %s", key, index) + if index: + return self.read(key, wait=True, waitIndex=index, timeout=timeout, + recursive=recursive) + else: + return self.read(key, wait=True, timeout=timeout, + recursive=recursive) + + def eternal_watch(self, key, index=None, recursive=None): + """ + Generator that will yield changes from a key. + Note that this method will block forever until an event is generated. + + Args: + key (str): Key to subcribe to. + index (int): Index from where the changes will be received. + + Yields: + client.EtcdResult + + >>> for event in client.eternal_watch('/subcription_key'): + ... print event.value + ... + value1 + value2 + + """ + local_index = index + while True: + response = self.watch(key, index=local_index, timeout=0, recursive=recursive) + local_index = response.modifiedIndex + 1 + yield response + + def get_lock(self, *args, **kwargs): + raise NotImplementedError('Lock primitives were removed from etcd 2.0') + + @property + def election(self): + raise NotImplementedError('Election primitives were removed from etcd 2.0') + + def _result_from_response(self, response): + """ Creates an EtcdResult from json dictionary """ + raw_response = response.data + try: + res = json.loads(raw_response.decode('utf-8')) + except (TypeError, ValueError, UnicodeError) as e: + raise etcd.EtcdException( + 'Server response was not valid JSON: %r' % e) + try: + r = etcd.EtcdResult(**res) + if response.status == 201: + r.newKey = True + r.parse_headers(response) + return r + except Exception as e: + raise etcd.EtcdException( + 'Unable to decode server response: %r' % e) + + def _next_server(self, cause=None): + """ Selects the next server in the list, refreshes the server list. """ + _log.debug("Selection next machine in cache. Available machines: %s", + self._machines_cache) + try: + mach = self._machines_cache.pop() + except IndexError: + _log.error("Machines cache is empty, no machines to try.") + raise etcd.EtcdConnectionFailed('No more machines in the cluster', + cause=cause) + else: + _log.info("Selected new etcd server %s", mach) + return mach + + def _wrap_request(payload): + @wraps(payload) + def wrapper(self, path, method, params=None, timeout=None): + response = False + + if timeout is None: + timeout = self.read_timeout + + if timeout == 0: + timeout = None + + if not path.startswith('/'): + raise ValueError('Path does not start with /') + + while not response: + some_request_failed = False + try: + response = payload(self, path, method, + params=params, timeout=timeout) + # Check the cluster ID hasn't changed under us. We use + # preload_content=False above so we can read the headers + # before we wait for the content of a watch. + self._check_cluster_id(response) + # Now force the data to be preloaded in order to trigger any + # IO-related errors in this method rather than when we try to + # access it later. + _ = response.data + # urllib3 doesn't wrap all httplib exceptions and earlier versions + # don't wrap socket errors either. + except (HTTPError, HTTPException, socket.error) as e: + if (isinstance(params, dict) and + params.get("wait") == "true" and + isinstance(e, ReadTimeoutError)): + _log.debug("Watch timed out.") + raise etcd.EtcdWatchTimedOut( + "Watch timed out: %r" % e, + cause=e + ) + _log.error("Request to server %s failed: %r", + self._base_uri, e) + if self._allow_reconnect: + _log.info("Reconnection allowed, looking for another " + "server.") + # _next_server() raises EtcdException if there are no + # machines left to try, breaking out of the loop. + self._base_uri = self._next_server(cause=e) + some_request_failed = True + + # if exception is raised on _ = response.data + # the condition for while loop will be False + # but we should retry + response = False + else: + _log.debug("Reconnection disabled, giving up.") + raise etcd.EtcdConnectionFailed( + "Connection to etcd failed due to %r" % e, + cause=e + ) + except etcd.EtcdClusterIdChanged as e: + _log.warning(e) + raise + except: + _log.exception("Unexpected request failure, re-raising.") + raise + + if some_request_failed: + if not self._use_proxies: + # The cluster may have changed since last invocation + self._machines_cache = self.machines + self._machines_cache.remove(self._base_uri) + return self._handle_server_response(response) + return wrapper + + @_wrap_request + def api_execute(self, path, method, params=None, timeout=None): + """ Executes the query. """ + url = self._base_uri + path + + if (method == self._MGET) or (method == self._MDELETE): + return self.http.request( + method, + url, + timeout=timeout, + fields=params, + redirect=self.allow_redirect, + headers=self._get_headers(), + preload_content=False) + + elif (method == self._MPUT) or (method == self._MPOST): + return self.http.request_encode_body( + method, + url, + fields=params, + timeout=timeout, + encode_multipart=False, + redirect=self.allow_redirect, + headers=self._get_headers(), + preload_content=False) + else: + raise etcd.EtcdException( + 'HTTP method {} not supported'.format(method)) + + @_wrap_request + def api_execute_json(self, path, method, params=None, timeout=None): + url = self._base_uri + path + json_payload = json.dumps(params) + headers = self._get_headers() + headers['Content-Type'] = 'application/json' + return self.http.urlopen(method, + url, + body=json_payload, + timeout=timeout, + redirect=self.allow_redirect, + headers=headers, + preload_content=False) + + def _check_cluster_id(self, response): + cluster_id = response.getheader("x-etcd-cluster-id") + if not cluster_id: + _log.warning("etcd response did not contain a cluster ID") + return + id_changed = (self.expected_cluster_id and + cluster_id != self.expected_cluster_id) + # Update the ID so we only raise the exception once. + old_expected_cluster_id = self.expected_cluster_id + self.expected_cluster_id = cluster_id + if id_changed: + # Defensive: clear the pool so that we connect afresh next + # time. + self.http.clear() + raise etcd.EtcdClusterIdChanged( + 'The UUID of the cluster changed from {} to ' + '{}.'.format(old_expected_cluster_id, cluster_id)) + + def _handle_server_response(self, response): + """ Handles the server response """ + if response.status in [200, 201]: + return response + + else: + resp = response.data.decode('utf-8') + + # throw the appropriate exception + try: + r = json.loads(resp) + r['status'] = response.status + except (TypeError, ValueError): + # Bad JSON, make a response locally. + r = {"message": "Bad response", + "cause": str(resp)} + etcd.EtcdError.handle(r) + + def _get_headers(self): + if self.username and self.password: + credentials = ':'.join((self.username, self.password)) + return urllib3.make_headers(basic_auth=credentials) + return {} diff --git a/cluster_manage/etcd/lock.py b/cluster_manage/etcd/lock.py new file mode 100644 index 00000000000..a77aaa766a1 --- /dev/null +++ b/cluster_manage/etcd/lock.py @@ -0,0 +1,178 @@ +import logging +import etcd +import uuid + +_log = logging.getLogger(__name__) + +class Lock(object): + """ + Locking recipe for etcd, inspired by the kazoo recipe for zookeeper + """ + + def __init__(self, client, lock_name): + self.client = client + self.name = lock_name + # props to Netflix Curator for this trick. It is possible for our + # create request to succeed on the server, but for a failure to + # prevent us from getting back the full path name. We prefix our + # lock name with a uuid and can check for its presence on retry. + self._uuid = uuid.uuid4().hex + self.path = "{}/{}".format(client.lock_prefix, lock_name) + self.is_taken = False + self._sequence = None + _log.debug("Initiating lock for %s with uuid %s", self.path, self._uuid) + + @property + def uuid(self): + """ + The unique id of the lock + """ + return self._uuid + + @uuid.setter + def uuid(self, value): + old_uuid = self._uuid + self._uuid = value + if not self._find_lock(): + _log.warn("The hand-set uuid was not found, refusing") + self._uuid = old_uuid + raise ValueError("Inexistent UUID") + + @property + def is_acquired(self): + """ + tells us if the lock is acquired + """ + if not self.is_taken: + _log.debug("Lock not taken") + return False + try: + self.client.read(self.lock_key) + return True + except etcd.EtcdKeyNotFound: + _log.warn("Lock was supposedly taken, but we cannot find it") + self.is_taken = False + return False + + def acquire(self, blocking=True, lock_ttl=3600, timeout=0): + """ + Acquire the lock. + + :param blocking Block until the lock is obtained, or timeout is reached + :param lock_ttl The duration of the lock we acquired, set to None for eternal locks + :param timeout The time to wait before giving up on getting a lock + """ + # First of all try to write, if our lock is not present. + if not self._find_lock(): + _log.debug("Lock not found, writing it to %s", self.path) + res = self.client.write(self.path, self.uuid, ttl=lock_ttl, append=True) + self._set_sequence(res.key) + _log.debug("Lock key %s written, sequence is %s", res.key, self._sequence) + elif lock_ttl: + # Renew our lock if already here! + self.client.write(self.lock_key, self.uuid, ttl=lock_ttl) + + # now get the owner of the lock, and the next lowest sequence + return self._acquired(blocking=blocking, timeout=timeout) + + def release(self): + """ + Release the lock + """ + if not self._sequence: + self._find_lock() + try: + _log.debug("Releasing existing lock %s", self.lock_key) + self.client.delete(self.lock_key) + except etcd.EtcdKeyNotFound: + _log.info("Lock %s not found, nothing to release", self.lock_key) + pass + finally: + self.is_taken = False + + def __enter__(self): + """ + You can use the lock as a contextmanager + """ + self.acquire(blocking=True, lock_ttl=None) + return self + + def __exit__(self, type, value, traceback): + self.release() + return False + + def _acquired(self, blocking=True, timeout=0): + locker, nearest = self._get_locker() + self.is_taken = False + if self.lock_key == locker: + _log.debug("Lock acquired!") + # We own the lock, yay! + self.is_taken = True + return True + else: + self.is_taken = False + if not blocking: + return False + # Let's look for the lock + watch_key = nearest.key + _log.debug("Lock not acquired, now watching %s", watch_key) + t = max(0, timeout) + while True: + try: + r = self.client.watch(watch_key, timeout=t, index=nearest.modifiedIndex + 1) + _log.debug("Detected variation for %s: %s", r.key, r.action) + return self._acquired(blocking=True, timeout=timeout) + except etcd.EtcdKeyNotFound: + _log.debug("Key %s not present anymore, moving on", watch_key) + return self._acquired(blocking=True, timeout=timeout) + except etcd.EtcdLockExpired as e: + raise e + except etcd.EtcdException: + _log.exception("Unexpected exception") + + @property + def lock_key(self): + if not self._sequence: + raise ValueError("No sequence present.") + return self.path + '/' + str(self._sequence) + + def _set_sequence(self, key): + self._sequence = key.replace(self.path, '').lstrip('/') + + def _find_lock(self): + if self._sequence: + try: + res = self.client.read(self.lock_key) + self._uuid = res.value + return True + except etcd.EtcdKeyNotFound: + return False + elif self._uuid: + try: + for r in self.client.read(self.path, recursive=True).leaves: + if r.value == self._uuid: + self._set_sequence(r.key) + return True + except etcd.EtcdKeyNotFound: + pass + return False + + def _get_locker(self): + results = [res for res in + self.client.read(self.path, recursive=True).leaves] + if not self._sequence: + self._find_lock() + l = sorted([r.key for r in results]) + _log.debug("Lock keys found: %s", l) + try: + i = l.index(self.lock_key) + if i == 0: + _log.debug("No key before our one, we are the locker") + return (l[0], None) + else: + _log.debug("Locker: %s, key to watch: %s", l[0], l[i-1]) + return (l[0], next(x for x in results if x.key == l[i-1])) + except ValueError: + # Something very wrong is going on, most probably + # our lock has expired + raise etcd.EtcdLockExpired(u"Lock not found") diff --git a/cluster_manage/flash_cluster_manager.py b/cluster_manage/flash_cluster_manager.py new file mode 100644 index 00000000000..27ce48b77bb --- /dev/null +++ b/cluster_manage/flash_cluster_manager.py @@ -0,0 +1,275 @@ +#!/usr/bin/python3 +import logging +import os +import socket +import sys +import time + +import conf +import etcd +import flash_http_client +import placement_rule +import tidb_tools +import util +from pd_client import PDClient + +terminal: bool = False + + +def handle_receive_signal(signal_number, _): + print('Received signal: ', signal_number) + global terminal + terminal = handle_receive_signal + + +def get_host(): + return socket.gethostbyname(socket.gethostname()) + + +class TiFlashClusterNotMaster(Exception): + def __init__(self): + pass + + +def wrap_try_get_lock(func): + def wrap_func(manager, *args, **kwargs): + manager.try_get_lock() + role, ts = manager.state + if role == TiFlashClusterManager.ROLE_MASTER: + return func(manager, *args, **kwargs) + else: + raise TiFlashClusterNotMaster() + + return wrap_func + + +def wrap_add_task(interval_func): + def wrap_func(func): + def _wrap_func(manager, *args, **kwargs): + try: + func(manager, *args, **kwargs) + except TiFlashClusterNotMaster: + pass + except Exception as e: + manager.logger.exception(e) + + return _wrap_func + + return wrap_func + + +class Store: + TIFLASH_HTTP_PORT_LABEL = 'tiflash_http_port' + + def __eq__(self, other): + return self.inner == other + + def __str__(self): + return str(self.inner) + + def __init__(self, pd_store): + self.inner = pd_store + address = self.inner['address'] + host, port = address.split(':') + self.ip = socket.gethostbyname(host) + self.address = '{}:{}'.format(self.ip, port) + self.tiflash_http_port = None + self.tiflash_http_address = None + for label in self.inner['labels']: + if label['key'] == Store.TIFLASH_HTTP_PORT_LABEL: + self.tiflash_http_port = int(label['value']) + self.tiflash_http_address = '{}:{}'.format(self.ip, self.tiflash_http_port) + + @property + def id(self): + return self.inner['id'] + + +class Table: + def __init__(self, total_region, flash_region): + self.total_region = total_region + self.flash_region = flash_region + + +class TiFlashClusterManager: + ROLE_INIT = 0 + ROLE_SLAVE = 1 + ROLE_MASTER = 2 + FLASH_LABEL = {'key': 'engine', 'value': 'tiflash'} + + @staticmethod + def compute_cur_store(stores): + for _, store in stores.items(): + ok = store.ip == conf.flash_conf.service_ip + # ok = True + if ok and store.tiflash_http_port == conf.flash_conf.http_port: + return store + + raise Exception("Can not tell current store.\nservice_addr: {},\nall tiflash stores: {}".format( + conf.flash_conf.service_addr, [store.inner for store in stores.values()])) + + def _try_refresh(self): + try: + ori_role = self.state[0] + self.pd_client.etcd_client.refresh_ttl(self.cur_store.address) + self.state = [TiFlashClusterManager.ROLE_MASTER, time.time()] + if ori_role == TiFlashClusterManager.ROLE_INIT: + self.logger.debug('Continue become master') + + except etcd.EtcdValueError as e: + self.state = [TiFlashClusterManager.ROLE_SLAVE, time.time()] + self.logger.info('Refresh ttl fail become slave, %s', e.payload['message']) + + except etcd.EtcdKeyNotFound as e: + self.state = [TiFlashClusterManager.ROLE_INIT, 0] + self.try_get_lock() + + def try_get_lock(self): + role, ts = self.state + if role == TiFlashClusterManager.ROLE_INIT: + if self.pd_client.etcd_client.try_init_mutex(self.cur_store.address): + self.state = [TiFlashClusterManager.ROLE_MASTER, time.time()] + self.logger.info('After init, become master') + else: + self.state = [TiFlashClusterManager.ROLE_SLAVE, time.time()] + self.logger.info('After init, become slave') + elif role == TiFlashClusterManager.ROLE_SLAVE: + cur = time.time() + if cur >= ts + conf.flash_conf.cluster_master_ttl: + self.state = [TiFlashClusterManager.ROLE_INIT, 0] + self.logger.info('Timeout, become init') + self.try_get_lock() + else: + cur = time.time() + if cur >= ts + conf.flash_conf.cluster_refresh_interval: + self._try_refresh() + + def __init__(self, pd_client: PDClient, tidb_status_addr_list): + self.logger = logging.getLogger('TiFlashManager') + self.tidb_status_addr_list = tidb_status_addr_list + self.pd_client = pd_client + self.stores = {} + self.cur_store = None + self._update_cluster() + + self.state = [TiFlashClusterManager.ROLE_INIT, 0] + self._try_refresh() + self.table_update() + + def _update_cluster(self): + prev_stores = self.stores + self.stores = {store_id: Store(store) for store_id, store in + self.pd_client.get_store_by_labels(self.FLASH_LABEL).items()} + if self.stores != prev_stores and prev_stores: + self.logger.info('Update all tiflash stores: from {} to {}'.format([k.inner for k in prev_stores.values()], + [k.inner for k in self.stores.values()])) + self.cur_store = self.compute_cur_store(self.stores) + + def deal_with_region(self, region): + for peer in region.peers: + if peer.store_id == self.cur_store.id: + assert peer.is_learner + + def _check_and_make_rule(self, table, start_key, end_key, all_rules: dict): + rule_id = 'table-{}-r'.format(table['id']) + + need_new_rule = True + if rule_id in all_rules: + rule = all_rules[rule_id] + if rule.override and rule.start_key == start_key and rule.end_key == end_key and rule.label_constraints == [ + {"key": "engine", "op": "in", "values": ["tiflash"]} + ] and rule.location_labels == table["location_labels"] and rule.count == table[ + "replica_count" + ] and rule.role == "learner": + need_new_rule = False + + if need_new_rule: + rules_new = placement_rule.make_rule(rule_id, start_key, end_key, table["replica_count"], + table["location_labels"]) + self.set_rule(util.obj_2_dict(rules_new)) + + all_rules.pop(rule_id, None) + return need_new_rule + + @wrap_try_get_lock + def set_rule(self, rule): + if self.pd_client.set_rule(rule) == 200: + self.logger.info('Set placement rule {}'.format(rule)) + else: + raise Exception('Set placement rule {} fail'.format(rule)) + + def compute_sync_data_process(self, table_id, start_key, end_key): + stats_region: dict = self.pd_client.get_stats_region_by_range_json(start_key, end_key) + region_count = stats_region.get('count', 0) + flash_region_count = flash_http_client.get_region_count_by_table(self.stores.values(), table_id) + return region_count, flash_region_count + + @wrap_try_get_lock + def report_to_tidb(self, table, region_count, flash_region_count): + self.logger.info( + 'report_to_tidb {} region_count: {} flash_region_count: {}'.format(table, region_count, flash_region_count)) + + for idx, address in enumerate(self.tidb_status_addr_list): + try: + r = util.post_http( + '{}/tiflash/replica'.format(address, PDClient.PD_API_PREFIX, PDClient.PD_API_VERSION), + {"id": table['id'], "region_count": region_count, "flash_region_count": flash_region_count}) + if r.status_code == 200: + if idx != 0: + tmp = self.tidb_status_addr_list[0] + self.tidb_status_addr_list[0] = address + self.tidb_status_addr_list[idx] = tmp + return + except Exception: + continue + + self.logger.error( + 'all tidb status addr {} can not be used'.format(self.tidb_status_addr_list)) + + @wrap_try_get_lock + def remove_rule(self, rule_id): + self.pd_client.remove_rule(placement_rule.PR_FLASH_GROUP, rule_id) + self.logger.info('Remove placement rule {}'.format(rule_id)) + + @wrap_try_get_lock + def table_update(self): + table_list = tidb_tools.db_flash_replica(self.tidb_status_addr_list) + all_rules = self.pd_client.get_group_rules(placement_rule.PR_FLASH_GROUP) + for table in table_list: + from tikv_util import common + + table_id = table['id'] + st, ed = common.make_table_begin(table_id), common.make_table_end(table_id) + start_key, end_key = st.to_bytes(), ed.to_bytes() + self._check_and_make_rule(table, st.to_pd_key(), ed.to_pd_key(), all_rules) + + if not table['available']: + region_count, flash_region_count = self.compute_sync_data_process(table_id, start_key, end_key) + self.report_to_tidb(table, region_count, flash_region_count) + + for rule in all_rules.values(): + self.remove_rule(rule.id) + + +def main(): + flash_conf = conf.flash_conf + + if not os.path.exists(flash_conf.tmp_path): + os.makedirs(flash_conf.tmp_path) + + logging.basicConfig(filename='{}/flash_cluster_manager.log'.format(flash_conf.tmp_path), level=conf.log_level, + format='%(asctime)s <%(levelname)s> %(name)s: %(message)s') + logging.getLogger("requests").setLevel(logging.WARNING) + logging.getLogger("urllib3").setLevel(logging.WARNING) + + logging.debug('\nCluster Manager Version Info\n{}'.format(conf.version_info)) + + try: + pd_client = PDClient(flash_conf.pd_addrs) + TiFlashClusterManager(pd_client, conf.flash_conf.tidb_status_addr) + except Exception as e: + logging.exception(e) + + +if __name__ == '__main__': + main() diff --git a/cluster_manage/flash_http_client.py b/cluster_manage/flash_http_client.py new file mode 100644 index 00000000000..a8cf24fc202 --- /dev/null +++ b/cluster_manage/flash_http_client.py @@ -0,0 +1,47 @@ +#!/usr/bin/python3 +import requests + +import util + + +def curl_flash(address, params): + if type(params) != dict: + params = {'query': params} + r = util.curl_http(address, params) + return r + + +def get_region_count_by_table(store_list, table_id): + from tikv_util import common + + checker = common.CheckRegionCnt() + for store in store_list: + sql = 'DBGInvoke dump_region_table({},true)'.format(table_id) + try: + res = curl_flash(store.tiflash_http_address, sql) + checker.add(res.content) + except requests.exceptions.RequestException: + continue + return checker.compute() + + +def get_regions_by_range(address, start_key, end_key): + sql = "DBGInvoke find_region_by_range(\'{}\',\'{}\', 1)".format(start_key, end_key) + res = curl_flash(address, sql).text + res = res.split('\n')[:-1] + res[1] = res[1].split(' ')[1:-1] + return res + + +def get_region_count_by_range(address, start_key, end_key): + sql = "DBGInvoke find_region_by_range(\'{}\',\'{}\')".format(start_key, end_key) + res = curl_flash(address, sql).text.split('\n') + return int(res[0]) + + +def main(): + pass + + +if __name__ == '__main__': + main() diff --git a/cluster_manage/flash_tools.py b/cluster_manage/flash_tools.py new file mode 100644 index 00000000000..45446adcba7 --- /dev/null +++ b/cluster_manage/flash_tools.py @@ -0,0 +1,36 @@ +#!/usr/bin/python3 +import socket +import toml +import util + + +class FlashConfig: + + def __init__(self, file_path): + self.conf_file_path = file_path + self.conf_toml = toml.load(self.conf_file_path, _dict=dict) + self.pd_addrs = util.compute_addr_list(self.conf_toml['raft']['pd_addr']) + self.http_port = self.conf_toml['http_port'] + self.data_path = self.conf_toml['path'] + self.tmp_path = self.conf_toml['tmp_path'] + + p = self.conf_toml['flash'] + service_addr = p['service_addr'] + host, port = [e.strip() for e in service_addr.split(':')] + self.service_ip = socket.gethostbyname(host) + self.service_addr = '{}:{}'.format(self.service_ip, port) + self.tidb_status_addr = util.compute_addr_list(p['tidb_status_addr']) + flash_cluster = p['flash_cluster'] + self.cluster_master_ttl = flash_cluster['master_ttl'] + self.cluster_refresh_interval = min( + int(flash_cluster['refresh_interval']), self.cluster_master_ttl) + self.update_rule_interval = int(flash_cluster['update_rule_interval']) + + +def main(): + conf = FlashConfig('../running/config/config.xml') + pass + + +if __name__ == '__main__': + main() diff --git a/cluster_manage/pd_client.py b/cluster_manage/pd_client.py new file mode 100644 index 00000000000..6dc29abd04a --- /dev/null +++ b/cluster_manage/pd_client.py @@ -0,0 +1,144 @@ +#!/usr/bin/python3 +import logging +from typing import Optional + +import etcd +import uri +import conf +import util + + +class EtcdClient: + FLASH_PREFIX = 'tiflash' + FLASH_CLUSTER_MUTEX_KEY = '{}/cluster/master'.format(FLASH_PREFIX) + + def try_init_mutex(self, cluster_mutex_value): + try: + res = self.client.write(EtcdClient.FLASH_CLUSTER_MUTEX_KEY, + cluster_mutex_value, + ttl=conf.flash_conf.cluster_master_ttl, prevExist=False) + self.logger.info('Try to init master success, ttl: %d, create new key: %s', res.ttl, res.key) + return True + except etcd.EtcdAlreadyExist as e: + self.logger.info('Try to init master fail, %s', e.payload['message']) + return False + + def refresh_ttl(self, cluster_mutex_value): + self.client.refresh(EtcdClient.FLASH_CLUSTER_MUTEX_KEY, conf.flash_conf.cluster_master_ttl, + prevValue=cluster_mutex_value) + + def test_refresh_ttl_wrong_value(self, cluster_mutex_value): + self.refresh_ttl(cluster_mutex_value) + + def __init__(self, host, port): + self.logger = logging.getLogger('etcd.client') + self.client = etcd.Client(host=host, port=port) + + +class PDClient: + PD_API_PREFIX = 'pd/api' + PD_API_VERSION = 'v1' + + def get_all_regions_json(self): + r = util.curl_http('{}/{}/{}/regions'.format(self.leader, PDClient.PD_API_PREFIX, PDClient.PD_API_VERSION)) + return r.json() + + def get_regions_by_key_json(self, key: str, limit=16): + r = util.curl_http( + '{}/{}/{}/regions/key'.format(self.leader, PDClient.PD_API_PREFIX, PDClient.PD_API_VERSION), + {'key': key, 'limit': limit}) + return r.json() + + def get_region_by_id_json(self, region_id: int): + r = util.curl_http( + '{}/{}/{}/region/id/{}'.format(self.leader, PDClient.PD_API_PREFIX, PDClient.PD_API_VERSION, region_id)) + return r.json() + + def get_all_stores_json(self): + r = util.curl_http( + '{}/{}/{}/stores'.format(self.leader, PDClient.PD_API_PREFIX, PDClient.PD_API_VERSION)) + return r.json() + + def get_members_json(self, *args): + url = args[0] if args else self.leader + r = util.curl_http( + '{}/{}/{}/members'.format(url, PDClient.PD_API_PREFIX, PDClient.PD_API_VERSION)) + return r.json() + + def get_stats_region_by_range_json(self, start_key, end_key): + r = util.curl_http( + '{}/{}/{}/stats/region'.format(self.leader, PDClient.PD_API_PREFIX, PDClient.PD_API_VERSION), + {'start_key': start_key, 'end_key': end_key}, + ) + return r.json() + + def get_group_rules(self, group): + r = util.curl_http( + '{}/{}/{}/config/rules/group/{}'.format(self.leader, PDClient.PD_API_PREFIX, PDClient.PD_API_VERSION, + group)) + res = r.json() + res = res if res is not None else {} + for e in res: + if not isinstance(e, dict): + raise Exception('Got placement rules fail: {}'.format(r.text)) + from placement_rule import PlacementRule + return {e['id']: PlacementRule(**e) for e in res} + + def get_all_rules(self): + r = util.curl_http( + '{}/{}/{}/config/rules'.format(self.leader, PDClient.PD_API_PREFIX, PDClient.PD_API_VERSION)) + res = r.json() + return res if res is not None else {} + + def set_rule(self, rule): + r = util.post_http( + '{}/{}/{}/config/rule'.format(self.leader, PDClient.PD_API_PREFIX, PDClient.PD_API_VERSION), rule) + return r.status_code + + def remove_rule(self, group, rule_id): + r = util.delete_http( + '{}/{}/{}/config/rule/{}/{}'.format(self.leader, PDClient.PD_API_PREFIX, PDClient.PD_API_VERSION, group, + rule_id)) + return r.status_code + + def _update_leader_etcd(self): + for url in self.urls: + resp = self.get_members_json(url) + leader = resp.get('leader', {}) + client_urls = leader.get('client_urls', []) + if client_urls: + _client_urls = [] + for member in resp.get('members', {}): + _client_urls.extend(member.get('client_urls', [])) + self.urls = _client_urls + self.leader = uri.URI(client_urls[0]).authority + _etcd_leader_uri = uri.URI(resp.get('etcd_leader', {}).get('client_urls', [])[0]) + self.etcd_client = EtcdClient(_etcd_leader_uri.host, _etcd_leader_uri.port) + return + raise Exception("can not find pd leader") + + def get_store_by_labels(self, flash_label): + res = {} + all_stores = self.get_all_stores_json() + for store in all_stores['stores']: + store = store['store'] + for label in store.get('labels', []): + if label == flash_label and store.get('state_name') == 'Up': + res[store['id']] = store + return res + + def __init__(self, urls): + self.logger = logging.getLogger('pd.client') + + self.urls = urls + self.leader = "" + self.etcd_client: Optional[EtcdClient] = None + self._update_leader_etcd() + + +def main(): + pass + + +if __name__ == '__main__': + main() diff --git a/cluster_manage/placement_rule.py b/cluster_manage/placement_rule.py new file mode 100644 index 00000000000..3845abf4e16 --- /dev/null +++ b/cluster_manage/placement_rule.py @@ -0,0 +1,50 @@ +#!/usr/bin/python3 + +import util + +PR_FLASH_GROUP = "tiflash" + +base_rule = { + "group_id": PR_FLASH_GROUP, + "id": "", + "index": 0, + "override": True, + "start_key": None, + "end_key": None, + "role": "learner", + "count": 2, + "label_constraints": [ + {"key": "engine", "op": "in", "values": ["tiflash"]} + ], + "location_labels": None +} + + +class PlacementRule: + def __init__(self, **entries): + self.__dict__.update(entries) + if not hasattr(self, 'location_labels'): + self.location_labels = [] + + +def make_rule(rid: str, start_key, end_key, count, location_labels): + rule = PlacementRule(**base_rule) + rule.id = rid + rule.start_key = start_key + rule.end_key = end_key + rule.count = count + rule.location_labels = location_labels + return rule + + +def get_group_rules(group="tiflash"): + return [] + + +def main(): + rule = make_rule("1", b'1', b'2', 2, ['host']) + print(util.obj_2_dict(rule)) + + +if __name__ == '__main__': + main() diff --git a/cluster_manage/release.sh b/cluster_manage/release.sh new file mode 100755 index 00000000000..c29542e46eb --- /dev/null +++ b/cluster_manage/release.sh @@ -0,0 +1,52 @@ +#!/bin/bash + +set -ue + +pushd tikv_util +./build.sh + +popd + +python3 check_lib.py + +if [ $? != 0 ]; then + echo "check lib fail" + exit -1 +else + echo "check lib success" +fi + +commit_ts=`git log -1 --format="%ct"` + +if [ "`uname`" == "Darwin" ]; then + commit_time=`date -r $commit_ts +"%Y-%m-%d %H:%M:%S"` +else + commit_time=`date -d @$commit_ts +"%Y-%m-%d %H:%M:%S"` +fi + +git_hash=`git log -1 --format="%H"` +git_branch=`git symbolic-ref --short HEAD` +version_file='version.py' +git_hash_info="git_hash = '$git_hash'" +overwrite="true" + +if [ -f ${version_file} ]; then + tmp_hash=`head -n 1 version.py` + if [ "$tmp_hash" == "$git_hash_info" ]; then + overwrite="false" + fi +fi + +if [ $overwrite == "true" ]; then + echo "start to overwrite $version_file" + echo "$git_hash_info" > $version_file + echo "commit_time = '$commit_time'" >> $version_file + echo "git_branch = '$git_branch'" >> $version_file +fi + +echo "" +echo "Cluster Manager Version Info" +cat $version_file +echo "" + +pyinstaller flash_cluster_manager.py -y diff --git a/cluster_manage/tidb_tools.py b/cluster_manage/tidb_tools.py new file mode 100644 index 00000000000..9bf64a2e762 --- /dev/null +++ b/cluster_manage/tidb_tools.py @@ -0,0 +1,57 @@ +#!/usr/bin/python3 + +# https://github.com/pingcap/tidb/blob/master/docs/tidb_http_api.md + +import util + + +def curl_tidb(address, uri): + r = util.curl_http('{}{}'.format(address, uri)) + if r.status_code == 200: + return r.json() + else: + return None + + +def status(address): + return curl_tidb(address, '/status') + + +def table_by_id(address, table_id): + return curl_tidb(address, '/schema?table_id={}'.format(table_id)) + + +def db_info(address, table_id): + return curl_tidb(address, '/db-table/{}'.format(table_id)) + + +def db_schema(address, db): + return curl_tidb(address, '/schema/{}'.format(db)) + + +def db_all_schema(address): + return curl_tidb(address, '/schema') + + +def db_flash_replica(tidb_status_addr_list): + for idx, address in enumerate(tidb_status_addr_list): + try: + r = curl_tidb(address, '/tiflash/replica') + if r is not None: + if idx != 0: + tmp = tidb_status_addr_list[0] + tidb_status_addr_list[0] = address + tidb_status_addr_list[idx] = tmp + return r + except Exception: + continue + + raise Exception('all tidb status addr {} can not be used'.format(tidb_status_addr_list)) + + +def main(): + pass + + +if __name__ == '__main__': + main() diff --git a/cluster_manage/tikv_util/.clang-format b/cluster_manage/tikv_util/.clang-format new file mode 100644 index 00000000000..81ea492c437 --- /dev/null +++ b/cluster_manage/tikv_util/.clang-format @@ -0,0 +1,5 @@ +--- +BasedOnStyle: Google +DerivePointerAlignment: false +PointerAlignment: Middle +... diff --git a/cluster_manage/tikv_util/.gitignore b/cluster_manage/tikv_util/.gitignore new file mode 100644 index 00000000000..9d22eb46a9c --- /dev/null +++ b/cluster_manage/tikv_util/.gitignore @@ -0,0 +1,2 @@ +*.o +*.so diff --git a/cluster_manage/tikv_util/__init__.py b/cluster_manage/tikv_util/__init__.py new file mode 100644 index 00000000000..8b137891791 --- /dev/null +++ b/cluster_manage/tikv_util/__init__.py @@ -0,0 +1 @@ + diff --git a/cluster_manage/tikv_util/build.sh b/cluster_manage/tikv_util/build.sh new file mode 100755 index 00000000000..66bbfc837d2 --- /dev/null +++ b/cluster_manage/tikv_util/build.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +is_force="$1" + +if [[ "$is_force" == "true" ]]; then + is_force="--force" +else + is_force="" +fi + +python3 setup.py build_ext --inplace ${is_force} diff --git a/cluster_manage/tikv_util/check_region_cnt.cc b/cluster_manage/tikv_util/check_region_cnt.cc new file mode 100644 index 00000000000..d1ee0845c26 --- /dev/null +++ b/cluster_manage/tikv_util/check_region_cnt.cc @@ -0,0 +1,21 @@ +#include "check_region_cnt.h" +#include + +void CheckRegionCnt::Add(std::string_view s) { + char region_id[20]; + size_t id_len = 0; + + auto p1 = s.find('\n'); + auto p2 = s.find('\n', p1 + 1); + for (size_t i = p1 + 1; i < p2; ++i) { + if (s[i] == ' ') { + region_id[id_len] = 0; + data_.emplace(std::atoll(region_id)); + id_len = 0; + } else { + region_id[id_len++] = s[i]; + } + } +} + +int CheckRegionCnt::Compute() { return data_.size(); } \ No newline at end of file diff --git a/cluster_manage/tikv_util/check_region_cnt.h b/cluster_manage/tikv_util/check_region_cnt.h new file mode 100644 index 00000000000..994abc31be4 --- /dev/null +++ b/cluster_manage/tikv_util/check_region_cnt.h @@ -0,0 +1,20 @@ +#pragma once + +#include +#include +#include "type.h" + +namespace pybind11 { +class bytes; +} // namespace pybind11 + +namespace py = pybind11; + +class CheckRegionCnt { + public: + void Add(std::string_view); + int Compute(); + + private: + std::unordered_set data_; +}; diff --git a/cluster_manage/tikv_util/codec.h b/cluster_manage/tikv_util/codec.h new file mode 100644 index 00000000000..d6eb11c4a71 --- /dev/null +++ b/cluster_manage/tikv_util/codec.h @@ -0,0 +1,181 @@ +#pragma once + +#include "exception_util.h" +#include "type.h" + +#include +#include + +static const size_t kEncGroupSize = 8; +static const uint8_t kEncMarker = static_cast(0xff); +static const char kEncAscPadding[kEncGroupSize] = {0}; + +static const char kTablePrefix = 't'; +static const char * kRecordPrefixSep = "_r"; +static const char * kRecordPrefixEnd = "_s"; + +static const UInt64 kSignMark = (UInt64)1 << 63u; + +inline void EncodeBytes(const std::string & ori_str, std::stringstream & ss) { + const size_t len = ori_str.size(); + size_t index = 0; + while (index <= len) { + size_t remain = len - index; + size_t pad = 0; + if (remain >= kEncGroupSize) { + ss.write(ori_str.data() + index, kEncGroupSize); + } else { + pad = kEncGroupSize - remain; + ss.write(ori_str.data() + index, remain); + ss.write(kEncAscPadding, pad); + } + ss.put(static_cast(kEncMarker - (uint8_t)pad)); + index += kEncGroupSize; + } +} + +inline std::string Encode(const std::string & ori_str) { + std::stringstream ss; + EncodeBytes(ori_str, ss); + return ss.str(); +} + +inline UInt64 EndianReverse(UInt64 x) { + UInt64 step32, step16; + step32 = x << 32u | x >> 32u; + step16 = (step32 & 0x0000FFFF0000FFFFULL) << 16u | + (step32 & 0xFFFF0000FFFF0000ULL) >> 16u; + return (step16 & 0x00FF00FF00FF00FFULL) << 8u | + (step16 & 0xFF00FF00FF00FF00ULL) >> 8u; +} + +inline UInt64 ToBigEndian(const UInt64 x) { return EndianReverse(x); } + +inline UInt64 EncodeUInt64(const UInt64 x) { return ToBigEndian(x); } + +inline UInt64 EncodeInt64(const Int64 x) { + return EncodeUInt64(static_cast(x) ^ kSignMark); +} + +inline UInt64 EncodeUInt64Desc(const UInt64 x) { return EncodeUInt64(~x); } + +inline UInt64 DecodeUInt64(const UInt64 x) { return ToBigEndian(x); } + +inline UInt64 DecodeUInt64Desc(const UInt64 x) { return ~DecodeUInt64(x); } + +inline Int64 DecodeInt64(const UInt64 x) { + return static_cast(DecodeUInt64(x) ^ kSignMark); +} + +inline std::string GenKeyByTable(const TableId tableId) { + std::string key(1 + 8, 0); + memcpy(key.data(), &kTablePrefix, 1); + auto big_endian_table_id = EncodeInt64(tableId); + memcpy(key.data() + 1, reinterpret_cast(&big_endian_table_id), + 8); + return Encode(key); +} + +template +inline std::string GenKey(const TableId tableId) { + return GenKeyByTable(tableId + 1); +} + +template <> +inline std::string GenKey(const TableId tableId) { + std::string key(1 + 8 + 2, 0); + memcpy(key.data(), &kTablePrefix, 1); + auto big_endian_table_id = EncodeInt64(tableId); + memcpy(key.data() + 1, reinterpret_cast(&big_endian_table_id), + 8); + memcpy(key.data() + 1 + 8, kRecordPrefixSep, 2); + return Encode(key); +} + +inline std::string GenKey(const TableId tableId, const HandleId handle) { + std::string key(1 + 8 + 2 + 8, 0); + memcpy(key.data(), &kTablePrefix, 1); + auto big_endian_table_id = EncodeInt64(tableId); + memcpy(key.data() + 1, reinterpret_cast(&big_endian_table_id), + 8); + memcpy(key.data() + 1 + 8, kRecordPrefixSep, 2); + auto big_endian_handle_id = EncodeInt64(handle); + memcpy(key.data() + 1 + 8 + 2, + reinterpret_cast(&big_endian_handle_id), 8); + return Encode(key); +} + +inline std::string ToPdKey(const char * key, const size_t len) { + std::string res(len * 2, 0); + size_t i = 0; + for (size_t k = 0; k < len; ++k) { + uint8_t o = key[k]; + res[i++] = o / 16; + res[i++] = o % 16; + } + + for (char & re : res) { + if (re < 10) + re = re + '0'; + else + re = re - 10 + 'A'; + } + return res; +} + +inline std::string ToPdKey(const std::string & key) { + return ToPdKey(key.data(), key.size()); +} + +inline std::string FromPdKey(const char * key, const size_t len) { + std::string res(len / 2, 0); + for (size_t k = 0; k < len; k += 2) { + int s[2]; + + for (size_t i = 0; i < 2; ++i) { + char p = key[k + i]; + if (p >= 'A') + s[i] = p - 'A' + 10; + else + s[i] = p - '0'; + } + + res[k / 2] = s[0] * 16 + s[1]; + } + return res; +} + +inline bool CheckKeyPaddingValid(const char * ptr, const UInt8 pad_size) { + UInt64 p = (*reinterpret_cast(ptr)) >> + ((kEncGroupSize - pad_size) * 8); + return p == 0; +} + +inline std::tuple DecodeTikvKeyFull( + const std::string & key) { + const size_t chunk_len = kEncGroupSize + 1; + std::string res; + res.reserve(key.size() / chunk_len * kEncGroupSize); + for (const char * ptr = key.data();; ptr += chunk_len) { + if (ptr + chunk_len > key.size() + key.data()) + throw Exception("Unexpected eof"); + auto marker = (UInt8) * (ptr + kEncGroupSize); + UInt8 pad_size = (kEncMarker - marker); + if (pad_size == 0) { + res.append(ptr, kEncGroupSize); + continue; + } + if (pad_size > kEncGroupSize) throw Exception("Key padding"); + res.append(ptr, kEncGroupSize - pad_size); + + if (!CheckKeyPaddingValid(ptr, pad_size)) + throw Exception("Key padding, wrong end"); + + // raw string and the offset of remaining string such as timestamp + return std::make_tuple(std::move(res), ptr - key.data() + chunk_len); + } +} + +inline std::string DecodeTikvKey(const std::string & key) { + return std::get<0>(DecodeTikvKeyFull(key)); +} diff --git a/cluster_manage/tikv_util/exception_util.h b/cluster_manage/tikv_util/exception_util.h new file mode 100644 index 00000000000..5bf9332a483 --- /dev/null +++ b/cluster_manage/tikv_util/exception_util.h @@ -0,0 +1,14 @@ +#pragma once + +#include +#include + +class Exception : std::exception { + public: + const char * what() const noexcept override { return msg_.data(); } + + explicit Exception(std::string msg) : msg_(std::move(msg)) {} + + private: + std::string msg_; +}; \ No newline at end of file diff --git a/cluster_manage/tikv_util/export.cc b/cluster_manage/tikv_util/export.cc new file mode 100644 index 00000000000..367c2f31359 --- /dev/null +++ b/cluster_manage/tikv_util/export.cc @@ -0,0 +1,28 @@ +#include "check_region_cnt.h" +#include "tikv_key.h" + +#include +namespace py = pybind11; + +PYBIND11_MODULE(common, m) { + py::class_(m, "TikvKey") + .def(py::init()) + .def("size", &TikvKey::Size) + .def("compare", &TikvKey::Compare) + .def("to_bytes", &TikvKey::ToBytes) + .def("to_pd_key", &TikvKey::ToPdKey); + + py::class_(m, "CheckRegionCnt") + .def(py::init()) + .def("add", &CheckRegionCnt::Add) + .def("compute", &CheckRegionCnt::Compute); + + m.def("make_table_begin", &MakeTableBegin); + m.def("make_table_end", &MakeTableEnd); + m.def("make_whole_table_begin", &MakeWholeTableBegin); + m.def("make_whole_table_end", &MakeWholeTableEnd); + m.def("make_table_handle", &MakeTableHandle); + m.def("decode_pd_key", &DecodePdKey); + m.def("get_table_id", &GetTableId); + m.def("get_handle", &GetHandle); +} diff --git a/cluster_manage/tikv_util/format.sh b/cluster_manage/tikv_util/format.sh new file mode 100755 index 00000000000..1ed20592d9f --- /dev/null +++ b/cluster_manage/tikv_util/format.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +clang_format=$(bash -c "compgen -c clang-format | grep 'clang-format' | sort --version-sort | head -n1") + +if [[ ! -z ${clang_format} ]]; then + find ./ -name "*.h" -or -name "*.cc" | xargs ${clang_format} -i +else + echo clang-format missing. try to install +fi diff --git a/cluster_manage/tikv_util/setup.py b/cluster_manage/tikv_util/setup.py new file mode 100644 index 00000000000..51ebaec6d35 --- /dev/null +++ b/cluster_manage/tikv_util/setup.py @@ -0,0 +1,111 @@ +import os + +from setuptools import setup, Extension +from setuptools.command.build_ext import build_ext +import sys +import setuptools + +__version__ = '0.0.1' + + +class GetPybindInclude(object): + """Helper class to determine the pybind11 include path + + The purpose of this class is to postpone importing pybind11 + until it is actually installed, so that the ``get_include()`` + method can be invoked. """ + + def __init__(self, user=False): + self.user = user + + def __str__(self): + import pybind11 + return pybind11.get_include(self.user) + + +ext_modules = [ + Extension( + 'common', + sources=['tikv_key.cc', 'export.cc', 'check_region_cnt.cc'], + include_dirs=[ + GetPybindInclude(), + GetPybindInclude(user=True) + ], + language='c++' + ), +] + + +# As of Python 3.6, CCompiler has a `has_flag` method. +# cf http://bugs.python.org/issue26689 +def has_flag(compiler, flagname): + """Return a boolean indicating whether a flag name is supported on + the specified compiler. + """ + import tempfile + with tempfile.NamedTemporaryFile('w', suffix='.cpp') as f: + f.write('int main (int argc, char **argv) { return 0; }') + try: + compiler.compile([f.name], extra_postargs=[flagname]) + except setuptools.distutils.errors.CompileError: + return False + return True + + +def cpp_flag(compiler): + """Return the -std=c++[11/14/17] compiler flag. + + The newer version is prefered over c++11 (when it is available). + """ + flags = ['-std=c++17', '-std=c++14', '-std=c++11'] + + for flag in flags: + if has_flag(compiler, flag): + return flag + + raise RuntimeError('Unsupported compiler -- at least C++11 support ' + 'is needed!') + + +class BuildExt(build_ext): + """A custom build extension for adding compiler-specific options.""" + c_opts = { + 'unix': [], + } + l_opts = { + 'unix': [], + } + + if sys.platform == 'darwin': + darwin_opts = ['-stdlib=libc++', '-mmacosx-version-min=10.14'] + c_opts['unix'] += darwin_opts + l_opts['unix'] += darwin_opts + os.environ["CC"] = "clang" + os.environ["CXX"] = "clang" + + def build_extensions(self): + ct = self.compiler.compiler_type + opts = self.c_opts.get(ct, []) + link_opts = self.l_opts.get(ct, []) + if ct == 'unix': + opts.append('-DVERSION_INFO="%s"' % self.distribution.get_version()) + opts.append(cpp_flag(self.compiler)) + if has_flag(self.compiler, '-fvisibility=hidden'): + opts.append('-fvisibility=hidden') + for ext in self.extensions: + ext.extra_compile_args = opts + ext.extra_link_args = link_opts + build_ext.build_extensions(self) + + +setup( + name='tikv_util', + version=__version__, + author='Tong Zhigao', + author_email='tongzhigao@pingcap.com', + ext_modules=ext_modules, + install_requires=['pybind11>=2.3'], + setup_requires=['pybind11>=2.3'], + cmdclass={'build_ext': BuildExt}, + zip_safe=False, +) diff --git a/cluster_manage/tikv_util/test.py b/cluster_manage/tikv_util/test.py new file mode 100644 index 00000000000..47cbacb37ce --- /dev/null +++ b/cluster_manage/tikv_util/test.py @@ -0,0 +1,63 @@ +#!/usr/bin/python3 + +import unittest + +from tikv_util import common + + +class TestTikvKey(unittest.TestCase): + + def test_pd_tikv_key(self): + pd_key = '7480000000000000FF2D5F728000000000FF3921010000000000FA' + ori_tikv_key = b't\x80\x00\x00\x00\x00\x00\x00\xff-_r\x80\x00\x00\x00\x00\xff9!\x01\x00\x00\x00\x00\x00\xfa' + + self.assertEqual(len(ori_tikv_key) * 2, len(pd_key)) + + tikv_key = common.TikvKey(ori_tikv_key) + self.assertEqual(len(ori_tikv_key), tikv_key.size()) + self.assertEqual(ori_tikv_key, tikv_key.to_bytes()) + self.assertEqual(pd_key, tikv_key.to_pd_key()) + + p = common.decode_pd_key(pd_key, len(pd_key)) + self.assertEqual(p, ori_tikv_key) + + def test_start_end_of_table(self): + b1, e1 = common.make_table_begin(123), common.make_table_end(123) + self.assertEqual(b1.to_pd_key(), '7480000000000000FF7B5F720000000000FA') + self.assertEqual(e1.to_pd_key(), '7480000000000000FF7C00000000000000F8') + self.assertTrue(b1.compare(e1) < 0) + b2, e2 = common.make_table_begin(124), common.make_table_end(124) + self.assertTrue(e1.compare(b2) < 0) + b3, e3 = common.make_whole_table_begin(124), common.make_whole_table_end(124) + self.assertTrue(b3.compare(e1) == 0) + self.assertTrue(e3.compare(common.make_whole_table_begin(125)) == 0) + + def test_table_handle(self): + b, e = common.make_table_begin(123), common.make_table_end(123) + mx = common.make_table_handle(123, 9223372036854775807) + mn = common.make_table_handle(123, -9223372036854775808) + self.assertTrue(b.compare(mn) < 0) + self.assertTrue(mn.compare(mx) < 0) + self.assertTrue(mx.compare(e) < 0) + self.assertTrue(mx.compare(mx) == 0) + self.assertEqual(common.get_table_id(mx), 123) + self.assertEqual(common.get_handle(mx), 9223372036854775807) + self.assertEqual(common.get_handle(mn), -9223372036854775808) + + def test_region_cnt(self): + s1 = b'3\n1000 1234 7 \n' + s2 = b'3\n1000 7 9 \n' + c = common.CheckRegionCnt() + c.add(s1) + c.add(s2) + r = c.compute() + self.assertEqual(4, r) + + def test_exception(self): + o = b'12345' + k = common.TikvKey(o) + try: + common.get_table_id(k) + self.assertTrue(False) + except RuntimeError: + pass diff --git a/cluster_manage/tikv_util/tikv_key.cc b/cluster_manage/tikv_util/tikv_key.cc new file mode 100644 index 00000000000..8ffe3c346f4 --- /dev/null +++ b/cluster_manage/tikv_util/tikv_key.cc @@ -0,0 +1,66 @@ +#include "tikv_key.h" +#include +#include "codec.h" +namespace py = pybind11; + +TikvKey::TikvKey(String && s) : key_(std::move(s)) {} +TikvKey::TikvKey(std::string_view s) : key_(s) {} + +TikvKey::TikvKey(TikvKey && kv) : key_(kv.key_) {} + +int TikvKey::Compare(const TikvKey & k) { return key().compare(k.key()); } + +size_t TikvKey::Size() const { return key().size(); } + +TikvKey::~TikvKey() {} + +py::bytes TikvKey::ToBytes() const { return key(); } + +py::str TikvKey::ToPdKey() const { return ::ToPdKey(key_); } + +const TikvKey::String & TikvKey::key() const { return key_; } + +py::bytes DecodePdKey(const char * s, const size_t len) { + return FromPdKey(s, len); +} + +TikvKey MakeTableBegin(const TableId table_id) { + return TikvKey(GenKey(table_id)); +} + +TikvKey MakeTableEnd(const TableId table_id) { + return TikvKey(GenKey(table_id)); +} + +TikvKey MakeWholeTableBegin(const TableId table_id) { + return TikvKey(GenKeyByTable(table_id)); +} + +TikvKey MakeWholeTableEnd(const TableId table_id) { + return TikvKey(GenKeyByTable(table_id + 1)); +} + +TikvKey MakeTableHandle(const TableId table_id, const HandleId handle) { + return TikvKey(GenKey(table_id, handle)); +} + +template +inline T Read(const char * s) { + return *(reinterpret_cast(s)); +} + +TableId GetTableIdRaw(const std::string & key) { + return DecodeInt64(Read(key.data() + 1)); +} + +HandleId GetHandleRaw(const std::string & key) { + return DecodeInt64(Read(key.data() + 1 + 8 + 2)); +} + +TableId GetTableId(const TikvKey & key) { + return GetTableIdRaw(DecodeTikvKey(key.key())); +} + +HandleId GetHandle(const TikvKey & key) { + return GetHandleRaw(DecodeTikvKey(key.key())); +} \ No newline at end of file diff --git a/cluster_manage/tikv_util/tikv_key.h b/cluster_manage/tikv_util/tikv_key.h new file mode 100644 index 00000000000..379d2f16153 --- /dev/null +++ b/cluster_manage/tikv_util/tikv_key.h @@ -0,0 +1,49 @@ +#pragma once + +#include +#include "type.h" + +namespace pybind11 { +class bytes; +class str; +} // namespace pybind11 + +namespace py = pybind11; + +class TikvKey { + public: + using String = std::string; + + TikvKey() = default; + explicit TikvKey(String && s); + explicit TikvKey(std::string_view); + TikvKey(const TikvKey &) = delete; + TikvKey(TikvKey && kv); + + int Compare(const TikvKey & k); + + size_t Size() const; + + ~TikvKey(); + + py::bytes ToBytes() const; + + py::str ToPdKey() const; + + const String & key() const; + + private: + const String key_; +}; + +TikvKey MakeTableBegin(const TableId table_id); +TikvKey MakeTableEnd(const TableId table_id); +TikvKey MakeTableHandle(const TableId table_id, const HandleId handle); +TikvKey MakeWholeTableBegin(const TableId table_id); +TikvKey MakeWholeTableEnd(const TableId table_id); + +py::bytes DecodePdKey(const char * s, const size_t len); + +TableId GetTableId(const TikvKey & key); + +HandleId GetHandle(const TikvKey & key); \ No newline at end of file diff --git a/cluster_manage/tikv_util/type.h b/cluster_manage/tikv_util/type.h new file mode 100644 index 00000000000..359bf1b4198 --- /dev/null +++ b/cluster_manage/tikv_util/type.h @@ -0,0 +1,9 @@ +#pragma once + +#include + +using Int64 = int64_t; +using UInt64 = uint64_t; +using UInt8 = uint8_t; +using TableId = Int64; +using HandleId = Int64; diff --git a/cluster_manage/timer.py b/cluster_manage/timer.py new file mode 100644 index 00000000000..a150a53683a --- /dev/null +++ b/cluster_manage/timer.py @@ -0,0 +1,53 @@ +#!/usr/bin/python3 + +import heapq +import time + + +class Task: + def __init__(self, func, run_tso): + self.func = func + self.run_tso = run_tso + + def __lt__(self, other): + return self.run_tso < other.run_tso + + def __ge__(self, other): + return self.run_tso >= other.run_tso + + def __le__(self, other): + return self.run_tso <= other.run_tso + + +class Timer: + def __init__(self): + self.queue = [] + + def add(self, func, interval): + heapq.heappush(self.queue, Task(func, time.time() + interval)) + + def run(self): + while True: + if not len(self.queue): + time.sleep(0.2) + continue + task = heapq.heappop(self.queue) + now = time.time() + if now < task.run_tso: + time.sleep(task.run_tso - now) + task.func() + + +def main(): + timer = Timer() + + def run(): + print('x') + timer.add(run, time.time() + 2) + + timer.add(run, time.time() + 2) + timer.run() + + +if __name__ == '__main__': + main() diff --git a/cluster_manage/util.py b/cluster_manage/util.py new file mode 100644 index 00000000000..2428ec75863 --- /dev/null +++ b/cluster_manage/util.py @@ -0,0 +1,118 @@ +#!/usr/bin/python3 +import errno +import os +import socket +import time +import fcntl +import logging + +import requests + + +def wrap_run_time(func): + def wrap_func(*args, **kwargs): + bg = time.time() + r = func(*args, **kwargs) + print('time cost {}'.format(time.time() - bg)) + return r + + return wrap_func + + +class FLOCK(object): + def __init__(self, name): + self.obj = open(name, 'w') + self.fd = self.obj.fileno() + + def lock(self): + try: + fcntl.lockf(self.fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + return True + except OSError: + logging.error( + 'Cannot lock file {}. Another instance in same directory is already running'.format(self.obj.name)) + return False + + +def curl_http(uri, params=None): + if params is None: + params = {} + r = requests.get('http://{}'.format(uri), params) + return r + + +def post_http(uri, params): + r = requests.post('http://{}'.format(uri), json=params) + return r + + +def delete_http(uri): + r = requests.delete('http://{}'.format(uri)) + return r + + +def obj_2_dict(obj): + pr = {} + for name in dir(obj): + value = getattr(obj, name) + if not name.startswith('_') and not callable(value): + pr[name] = value + return pr + + +def make_compare_pd_key(key): + return (1, key) if key else (0, '') + + +def net_is_used(ip, port): + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + result = s.connect_ex((ip, port)) + s.close() + return result == 0 + + +def pid_exists(pid): + """Check whether pid exists in the current process table. + UNIX only. + """ + if pid < 0: + return False + if pid == 0: + # According to "man 2 kill" PID 0 refers to every process + # in the process group of the calling process. + # On certain systems 0 is a valid PID but we have no way + # to know that in a portable fashion. + raise ValueError('invalid PID 0') + try: + os.kill(pid, 0) + except OSError as err: + if err.errno == errno.ESRCH: + # ESRCH == No such process + return False + elif err.errno == errno.EPERM: + # EPERM clearly means there's a process to deny access to + return True + else: + # According to "man 2 kill" possible error values are + # (EINVAL, EPERM, ESRCH) + raise + else: + return True + + +def pid_exists2(pid): + if pid == 0: + return True + return pid_exists(pid) + + +def compute_addr_list(addrs): + return [e.strip() for e in addrs.split(',') if e] + + +def main(): + pass + + +if __name__ == '__main__': + main() From 4484efbb4c383d02e6f8e4d7614d8cc81742cd05 Mon Sep 17 00:00:00 2001 From: Tong Zhigao Date: Fri, 13 Dec 2019 12:57:15 +0800 Subject: [PATCH 2/2] update dockerfile --- cluster_manage/README.md | 4 ++-- docker/builder/Dockerfile | 11 +++++++++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/cluster_manage/README.md b/cluster_manage/README.md index 672cd97dc54..f3823d997a4 100644 --- a/cluster_manage/README.md +++ b/cluster_manage/README.md @@ -8,7 +8,7 @@ replicas configuration in tidb and try to build flash engine peers or monitor th ## Prepare * install `python3.7`, `pybind11`, `pyinstaller`, `clang-format`, `dnspython`, `uri`, `requests`, `urllib3` -, `toml`, `dnspython`, `C++17` +, `toml`, `C++17` , `setuptools` * use `release.sh` and get dir `dist/flash_cluster_manager/`. @@ -23,4 +23,4 @@ replicas configuration in tidb and try to build flash engine peers or monitor th * run cmd: ``` ./dist/flash_cluster_manager/flash_cluster_manager --config [path-to-flash-config.toml] -``` \ No newline at end of file +``` diff --git a/docker/builder/Dockerfile b/docker/builder/Dockerfile index de07494b5f9..3070aabf801 100644 --- a/docker/builder/Dockerfile +++ b/docker/builder/Dockerfile @@ -9,9 +9,10 @@ RUN apt update -y \ liblld-5.0-dev libclang-5.0-dev liblld-5.0 \ build-essential autoconf libtool pkg-config \ libgflags-dev libgtest-dev \ - golang curl \ + golang curl python3-pip \ # For tests: # bash expect python python-lxml python-termcolor curl perl sudo tzdata && curl https://sh.rustup.rs -sSf | sh -s -- -y --profile minimal --default-toolchain nightly \ + && pip3 install pybind11 pyinstaller dnspython uri requests urllib3 toml setuptools \ && rm -rf /var/lib/apt/lists/* RUN git clone https://github.com/grpc/grpc.git && cd grpc && git checkout v1.14.2 && git submodule update --init \ @@ -21,5 +22,11 @@ RUN git clone https://github.com/grpc/grpc.git && cd grpc && git checkout v1.14. && source /root/.profile && ldconfig \ && cd / && mkdir libtiflash-proxy && git clone -b tiflash-proxy-lib https://github.com/solotzg/tikv.git tiflash-proxy && cd /tiflash-proxy \ && make release && cp target/release/libtiflash_proxy.so /libtiflash-proxy && cd / && rm -rf /tiflash-proxy \ - && rustup self uninstall -y + && rustup self uninstall -y \ + && git clone https://github.com/pingcap/tics.git /tics-build \ + && cd tics-build && cd cluster_manage \ + && ./release.sh \ + && rm -rf /flash_cluster_manager && cp -r dist/flash_cluster_manager/ /flash_cluster_manager \ + && cd / && rm -rf tics-build/ +