Skip to content

Commit

Permalink
Merge pull request #428 from hellohaptik/cj_fix_v2_text_es_conn
Browse files Browse the repository at this point in the history
Drop Elasticsearch client instance if ping fails and raise exception
  • Loading branch information
chiragjn authored May 4, 2021
2 parents 9d11979 + abe6604 commit 44c38fa
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 153 deletions.
14 changes: 4 additions & 10 deletions chatbot_ner/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,15 +44,9 @@
ES_DOC_TYPE = os.environ.get('ES_DOC_TYPE', 'data_dictionary')
ES_AUTH_NAME = os.environ.get('ES_AUTH_NAME')
ES_AUTH_PASSWORD = os.environ.get('ES_AUTH_PASSWORD')
ES_BULK_MSG_SIZE = os.environ.get('ES_BULK_MSG_SIZE', '10000')
ES_SEARCH_SIZE = os.environ.get('ES_SEARCH_SIZE', '10000')

try:
ES_BULK_MSG_SIZE = int(ES_BULK_MSG_SIZE)
ES_SEARCH_SIZE = int(ES_SEARCH_SIZE)
except ValueError:
ES_BULK_MSG_SIZE = 1000
ES_SEARCH_SIZE = 1000
ES_BULK_MSG_SIZE = int((os.environ.get('ES_BULK_MSG_SIZE') or '').strip() or '1000')
ES_SEARCH_SIZE = int((os.environ.get('ES_SEARCH_SIZE') or '').strip() or '1000')
ES_REQUEST_TIMEOUT = int((os.environ.get('ES_REQUEST_TIMEOUT') or '').strip() or '20')

ELASTICSEARCH_CRF_DATA_INDEX_NAME = os.environ.get('ELASTICSEARCH_CRF_DATA_INDEX_NAME')
ELASTICSEARCH_CRF_DATA_DOC_TYPE = os.environ.get('ELASTICSEARCH_CRF_DATA_DOC_TYPE')
Expand Down Expand Up @@ -91,7 +85,7 @@
'retry_on_timeout': False,
'max_retries': 1,
'timeout': 20,
'request_timeout': 20,
'request_timeout': ES_REQUEST_TIMEOUT,

# Transfer Specific constants (ignore if only one elasticsearch is setup)
# For detailed explanation datastore.elastic_search.transfer.py
Expand Down
4 changes: 2 additions & 2 deletions chatbot_ner/urls.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
url(r'^v2/number_range_bulk/$', api_v2.number_range),
url(r'^v2/phone_number_bulk/$', api_v2.phone_number),

# Dictionary Read Write
# Deprecated dictionary read write, use entities/data/v1/*
url(r'^entities/get_entity_word_variants', external_api.get_entity_word_variants),
url(r'^entities/update_dictionary', external_api.update_dictionary),

Expand All @@ -54,7 +54,7 @@
url(r'^entities/get_crf_training_data', external_api.get_crf_training_data),
url(r'^entities/update_crf_training_data', external_api.update_crf_training_data),

# Train Crf Model
# Deprecated train crf model
url(r'^entities/train_crf_model', external_api.train_crf_model),

url(r'^entities/languages/v1/(?P<entity_name>.+)$', external_api.entity_language_view),
Expand Down
15 changes: 7 additions & 8 deletions external_api/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def get_entity_word_variants(request):
ner_logger.exception('Error: %s' % error_message)
return HttpResponse(json.dumps(response), content_type='application/json', status=500)

except BaseException as e:
except Exception as e:
response['error'] = str(e)
ner_logger.exception('Error: %s' % e)
return HttpResponse(json.dumps(response), content_type='application/json', status=500)
Expand Down Expand Up @@ -92,7 +92,7 @@ def update_dictionary(request):
ner_logger.exception('Error: %s' % error_message)
return HttpResponse(json.dumps(response), content_type='application/json', status=500)

except BaseException as e:
except Exception as e:
response['error'] = str(e)
ner_logger.exception('Error: %s' % e)
return HttpResponse(json.dumps(response), content_type='application/json', status=500)
Expand Down Expand Up @@ -125,7 +125,7 @@ def transfer_entities(request):
ner_logger.exception('Error: %s' % error_message)
return HttpResponse(json.dumps(response), content_type='application/json', status=500)

except BaseException as e:
except Exception as e:
response['error'] = str(e)
ner_logger.exception('Error: %s' % e)
return HttpResponse(json.dumps(response), content_type='application/json', status=500)
Expand Down Expand Up @@ -166,7 +166,7 @@ def get_crf_training_data(request):
ner_logger.exception('Error: %s' % error_message)
return HttpResponse(json.dumps(response), content_type='application/json', status=500)

except BaseException as e:
except Exception as e:
response['error'] = str(e)
ner_logger.exception('Error: %s' % e)
return HttpResponse(json.dumps(response), content_type='application/json', status=500)
Expand All @@ -193,8 +193,7 @@ def update_crf_training_data(request):
external_api_data = json.loads(request.POST.get(EXTERNAL_API_DATA))
sentences = external_api_data.get(SENTENCES)
entity_name = external_api_data.get(ENTITY_NAME)
DataStore().update_entity_crf_data(entity_name=entity_name,
sentences=sentences)
DataStore().update_entity_crf_data(entity_name=entity_name, sentences=sentences)
response['success'] = True

except (DataStoreSettingsImproperlyConfiguredException,
Expand All @@ -204,7 +203,7 @@ def update_crf_training_data(request):
ner_logger.exception('Error: %s' % error_message)
return HttpResponse(json.dumps(response), content_type='application/json', status=500)

except BaseException as e:
except Exception as e:
response['error'] = str(e)
ner_logger.exception('Error: %s' % e)
return HttpResponse(json.dumps(response), content_type='application/json', status=500)
Expand Down Expand Up @@ -257,7 +256,7 @@ def train_crf_model(request):
ner_logger.exception('Error: %s' % error_message)
return HttpResponse(json.dumps(response), content_type='application/json', status=500)

except BaseException as e:
except Exception as e:
response['error'] = str(e)
ner_logger.exception('Error: %s' % e)
return HttpResponse(json.dumps(response), content_type='application/json', status=500)
Expand Down
124 changes: 42 additions & 82 deletions ner_v2/detectors/textual/elastic_search.py
Original file line number Diff line number Diff line change
@@ -1,98 +1,69 @@
from __future__ import absolute_import

import json
import six

from itertools import chain
import six
from elasticsearch import Elasticsearch

from lib.singleton import Singleton
from chatbot_ner.config import ner_logger, CHATBOT_NER_DATASTORE
from datastore import constants
from datastore.exceptions import DataStoreSettingsImproperlyConfiguredException, DataStoreRequestException
from datastore.exceptions import (EngineConnectionException, DataStoreSettingsImproperlyConfiguredException,
DataStoreRequestException)
from language_utilities.constant import ENGLISH_LANG

from ner_v2.detectors.textual.queries import _generate_multi_entity_es_query, \
_parse_multi_entity_es_results
from lib.singleton import Singleton
from ner_v2.detectors.textual.queries import _generate_multi_entity_es_query, _parse_multi_entity_es_results


# NOTE: connection is a misleading term for this implementation, rather what we have are clients that manage
# any real connections on their own
class ElasticSearchDataStore(six.with_metaclass(Singleton, object)):
"""
Class responsible for holding connections and performing search in
ElasticSearch DB.
Instance responsible for holding connections and performing search in ElasticSearch DB.
Used as a singleton in this module.
"""

def __init__(self):
self._engine_name = constants.ELASTICSEARCH
self._kwargs = {}
self._conns = {}
self._connection_settings = {}
self._connection = None
self._index_name = None

self.query_data = []

# configure variables and connection
self._doc_type = None
self._configure_store()

# define doc type
self.doc_type = self._connection_settings[
constants.ELASTICSEARCH_DOC_TYPE]
def _clear_connections(self):
self._conns = {}

def _configure_store(self, **kwargs):
def _configure_store(self):
"""
Configure self variables and connection.
Also add default connection to registry with alias `default`
Configure self variables and connection settings.
"""
self._connection_settings = CHATBOT_NER_DATASTORE. \
get(self._engine_name)

self._connection_settings = CHATBOT_NER_DATASTORE.get(self._engine_name)
if self._connection_settings is None:
raise DataStoreSettingsImproperlyConfiguredException()

self._check_doc_type_for_elasticsearch()
self._index_name = self._connection_settings[constants.ELASTICSEARCH_ALIAS]
self._connection = self.connect(**self._connection_settings)
self._doc_type = self._connection_settings[constants.ELASTICSEARCH_DOC_TYPE]

self._conns['default'] = self._connection

def add_new_connection(self, alias, conn):
"""
Add new connection object, which can be directly passed through as-is to
the connection registry.
"""
self._conns[alias] = conn
@property
def _default_connection(self):
return self._get_or_create_new_connection(alias='default', **self._connection_settings)

def get_or_create_new_connection(self, alias="default", **kwargs):
def _get_or_create_new_connection(self, alias='default', **kwargs):
"""
Retrieve a connection with given alias.
Construct it if necessary (only when configuration was passed to us).
If some non-string alias has been passed through it assume a client instance
and will just return it as-is.
Raises ``KeyError`` if no client (or its definition) is registered
under the alias.
"""

if not isinstance(alias, six.string_types):
return alias

# connection already established
try:
return self._conns[alias]
except KeyError:
pass

# if not, try to create it a new connection
try:
if alias not in self._conns:
conn = self.connect(**kwargs)
if not conn:
raise EngineConnectionException(engine=self._engine_name)
self._conns[alias] = conn
except KeyError:
# no connection and no kwargs to set one up
raise KeyError("There is no connection with alias %r." % alias)

# check if this is necessary here
return self._conns[alias]

def _check_doc_type_for_elasticsearch(self):
"""
Checks if doc_type is present in connection settings, if not an exception is raised
Expand All @@ -101,30 +72,25 @@ def _check_doc_type_for_elasticsearch(self):
DataStoreSettingsImproperlyConfiguredException if doc_type was not found in
connection settings
"""
# TODO: This check should be during init or boot
if constants.ELASTICSEARCH_DOC_TYPE not in self._connection_settings:
ner_logger.debug("No doc type is present")
ner_logger.debug("No doc type is present in chatbot_ner.config.CHATBOT_NER_DATASTORE")
raise DataStoreSettingsImproperlyConfiguredException(
'Elasticsearch needs doc_type. Please configure ES_DOC_TYPE in your environment')

def generate_query_data(self, entities, texts, fuzziness_threshold=1,
search_language_script=ENGLISH_LANG):

# check if text is string
def generate_query_data(self, entities, texts, fuzziness_threshold=1, search_language_script=ENGLISH_LANG):
if isinstance(texts, str):
texts = [texts]

index_header = json.dumps({'index': self._index_name, 'type': self.doc_type})

data = list(chain.from_iterable([[index_header,
json.dumps(_generate_multi_entity_es_query(
entities=entities,
text=each,
fuzziness_threshold=fuzziness_threshold,
language_script=search_language_script))]
for each in texts]))
index_header = json.dumps({'index': self._index_name, 'type': self._doc_type})
query_parts = []
for text in texts:
query_parts.append(index_header)
text_query = _generate_multi_entity_es_query(entities=entities, text=text,
fuzziness_threshold=fuzziness_threshold,
language_script=search_language_script)
query_parts.append(json.dumps(text_query))

return data
return query_parts

def get_multi_entity_results(self, entities, texts, fuzziness_threshold=1,
search_language_script=ENGLISH_LANG, **kwargs):
Expand Down Expand Up @@ -162,24 +128,19 @@ def get_multi_entity_results(self, entities, texts, fuzziness_threshold=1,
('TMOS', 'TMOS'), ('G.', 'G Pulla Reddy Sweets')])}
]
"""

self._check_doc_type_for_elasticsearch()
request_timeout = self._connection_settings.get('request_timeout', 20)
index_name = self._index_name

data = []
for entity_list, text_list in zip(entities, texts):
data.extend(self.generate_query_data(entity_list, text_list, fuzziness_threshold,
search_language_script))
data.extend(self.generate_query_data(entity_list, text_list, fuzziness_threshold, search_language_script))

# add `\n` for each index_header and query data text entry
query_data = '\n'.join(data)

kwargs = dict(body=query_data, doc_type=self.doc_type, index=index_name,
request_timeout=request_timeout)
kwargs = dict(body=query_data, doc_type=self._doc_type, index=index_name, request_timeout=request_timeout)
response = None
try:
response = self._run_es_search(self._connection, **kwargs)
response = self._run_es_search(self._default_connection, **kwargs)
results = _parse_multi_entity_es_results(response.get("responses"))
except Exception as e:
raise DataStoreRequestException(f'Error in datastore query on index: {index_name}', engine='elasticsearch',
Expand All @@ -202,9 +163,10 @@ def connect(connection_url=None, host=None, port=None, user=None, password=None,
kwargs: any additional arguments will be passed on to the Transport class and, subsequently,
to the Connection instances.
Returns:
Elasticsearch client connection object
Elasticsearch client connection object or None
"""
# TODO: This does not account for ES_SCHEME unless the connection is being made via `connection_url`
connection = None
if user and password:
kwargs = dict(kwargs, http_auth=(user, password))
Expand All @@ -230,7 +192,6 @@ def _run_es_search(connection, **kwargs):
Returns:
dictionary, search results from elasticsearch.ElasticSearch.msearch
"""

return connection.msearch(**kwargs)

@staticmethod
Expand All @@ -247,8 +208,7 @@ def _get_dynamic_fuzziness_threshold(fuzzy_setting):
"""
if isinstance(fuzzy_setting, six.string_types):
if constants.ELASTICSEARCH_VERSION_MAJOR > 6 \
or (constants.ELASTICSEARCH_VERSION_MAJOR == 6
and constants.ELASTICSEARCH_VERSION_MINOR >= 2):
or (constants.ELASTICSEARCH_VERSION_MAJOR == 6 and constants.ELASTICSEARCH_VERSION_MINOR >= 2):
return fuzzy_setting
return 'auto'

Expand Down
Loading

0 comments on commit 44c38fa

Please sign in to comment.