Skip to content

Commit

Permalink
Test: add Kibana's sample data to the tests (#123)
Browse files Browse the repository at this point in the history
* add Kibana's sample data to the tests

- index the data used by Kibana as sample data into the local node;
- select and fetch all the columns from the "flights" set;

- "ecommerce" and "logs" require server-side changes to be able to use
them.

* pin kibana sample data source to specific commit

(cherry picked from commit b6002cd)
  • Loading branch information
bpintea committed Mar 6, 2019
1 parent b8c2e71 commit 2eab310
Show file tree
Hide file tree
Showing 2 changed files with 101 additions and 13 deletions.
100 changes: 88 additions & 12 deletions test/integration/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
import time
import hashlib
import os
import re
import gzip

from elasticsearch import Elasticsearch

Expand Down Expand Up @@ -143,6 +145,9 @@

ES_DATASET_BASE_URL = "https://raw.githubusercontent.com/elastic/elasticsearch/6857d305270be3d987689fda37cc84b7bc18fbb3/x-pack/plugin/sql/qa/src/main/resources/"

KIBANA_SAMPLES_BASE_URL = "https://raw.githubusercontent.com/elastic/kibana/54e498200b8b1a265becf1b27a6958e613acc3d1/src/legacy/server/sample_data/data_sets"
KIBANA_INDEX_PREFIX = "kibana_sample_data_"

# python seems to slow down when operating on multiple long strings?
BATCH_SIZE = 500

Expand Down Expand Up @@ -195,6 +200,11 @@ class TestData(object):
EMPLOYEES_FILE = "employees.csv"
EMPLOYEES_INDEX = "employees"

ECOMMERCE_INDEX = KIBANA_INDEX_PREFIX + "ecommerce"
FLIGHTS_INDEX = KIBANA_INDEX_PREFIX + "flights"
LOGS_INDEX = KIBANA_INDEX_PREFIX + "logs"


# loaded CSV attributes
_csv_md5 = None
_csv_header = None
Expand Down Expand Up @@ -236,7 +246,10 @@ def _docs_to_ndjson_batch(self, docs, index_string):
ndjson = ""
for doc in docs:
ndjson += index_string + "\n"
ndjson += json.dumps(doc) + "\n"
if type(doc) is str:
ndjson += doc + "\n"
else:
ndjson += json.dumps(doc) + "\n"
return ndjson

def _docs_to_ndjson(self, index_name, docs):
Expand All @@ -247,7 +260,7 @@ def _docs_to_ndjson(self, index_name, docs):
for i in range(0, len(docs), BATCH_SIZE):
ndjson = self._docs_to_ndjson_batch(docs[i : i + BATCH_SIZE], index_string)
ndjsons.append(ndjson)

return ndjsons if 1 < len(ndjsons) else ndjsons[0]


Expand Down Expand Up @@ -298,17 +311,20 @@ def _prepare_tableau_load(self, file_name, index_name, index_template):

return ndjson

def _post_ndjson(self, ndjson, index_name, pipeline_name=None):
def _post_ndjson(self, ndjsons, index_name, pipeline_name=None):
url = "http://localhost:%s/%s/_doc/_bulk" % (Elasticsearch.ES_PORT, index_name)
if pipeline_name:
url += "?pipeline=%s" % pipeline_name
with requests.post(url, data=ndjson, headers = {"Content-Type": "application/x-ndjson"}, auth=REQ_AUTH) as req:
if req.status_code != 200:
raise Exception("bulk POST to %s failed with code: %s (content: %s)" % (index_name, req.status_code,
req.text))
reply = json.loads(req.text)
if reply["errors"]:
raise Exception("bulk POST to %s failed with content: %s" % (index_name, req.text))
if type(ndjsons) is not list:
ndjsons = [ndjsons]
for n in ndjsons:
with requests.post(url, data=n, headers = {"Content-Type": "application/x-ndjson"}, auth=REQ_AUTH) as req:
if req.status_code != 200:
raise Exception("bulk POST to %s failed with code: %s (content: %s)" % (index_name, req.status_code,
req.text))
reply = json.loads(req.text)
if reply["errors"]:
raise Exception("bulk POST to %s failed with content: %s" % (index_name, req.text))

def _wait_for_results(self, index_name):
hits = 0
Expand Down Expand Up @@ -353,8 +369,7 @@ def _load_tableau_staples(self):
assert(isinstance(ndjsons, list))
if self.MODE_NOINDEX < self._mode:
self._delete_if_needed(self.STAPLES_INDEX)
for ndjson in ndjsons:
self._post_ndjson(ndjson, self.STAPLES_INDEX)
self._post_ndjson(ndjsons, self.STAPLES_INDEX)
self._wait_for_results(self.STAPLES_INDEX)

def _load_elastic_library(self):
Expand All @@ -371,11 +386,72 @@ def _load_elastic_employees(self):
self._post_ndjson(ndjson, self.EMPLOYEES_INDEX)
self._wait_for_results(self.EMPLOYEES_INDEX)


def _get_kibana_file(self, sample_name, is_mapping=True):
file_name = "field_mappings.js" if is_mapping else "%s.json.gz" % sample_name
if self._offline_dir:
path = os.path.join(self._offline_dir, sample_name, file_name)
with open(path, "r" if is_mapping else "rb") as f:
return f.read()
else:
url = KIBANA_SAMPLES_BASE_URL + "/" + sample_name + "/"
url += file_name
req = requests.get(url, timeout = Elasticsearch.REQ_TIMEOUT)
if req.status_code != 200:
raise Exception("failed to GET URL %s for index %s with: code: %s, body: %s" %
(url, sample_name, req.status_code, req.text))
return req.text if is_mapping else req.content

def _put_sample_template(self, sample_name, index_name):
mapping = self._get_kibana_file(sample_name, True)
# remove comments
mapping = re.sub(re.compile("/\*.*?\*/", re.DOTALL) , "", mapping)
mapping = re.sub(r"//.*?\n" , "", mapping)
# translate mapping to dict string
brace_at = mapping.find("{")
if brace_at < 0:
raise Exception("mapping is in unknown format; original: %s" % req.text)
mapping = mapping[brace_at:]
mapping = re.sub(re.compile("([a-zA-Z_]+)\s?:", re.M), r"'\g<1>':", mapping)
mapping = mapping.strip("\n;")
mapping = "{\n'properties': %s\n}" % mapping
mapping = "'mappings': %s\n" % mapping
mapping = "{\n'index_patterns': '%s*',\n%s}" % (index_name, mapping)
# turn it to JSON (to deal with trailing commas past last member on a level
mapping = eval(mapping)
# PUT the built template
url = "http://localhost:%s/_template/%s_template" % (Elasticsearch.ES_PORT, index_name)
with requests.put(url, json=mapping, auth=REQ_AUTH, timeout=Elasticsearch.REQ_TIMEOUT) as req:
if req.status_code != 200:
raise Exception("PUT %s template failed with code: %s (content: %s)" % (index_name,
req.status_code, req.text))

def _index_sample_data(self, sample_name, index_name):
docs = self._get_kibana_file(sample_name, False)
docs = gzip.decompress(docs)
docs = docs.decode("utf-8")
docs = docs.splitlines()
ndjsons = self._docs_to_ndjson(index_name, docs)
self._post_ndjson(ndjsons, index_name)

def _load_kibana_sample(self, index_name):
sample_name = index_name[len(KIBANA_INDEX_PREFIX):]
self._delete_if_needed(index_name)
self._put_sample_template(sample_name, index_name)
self._index_sample_data(sample_name, index_name)


def load(self):
self._load_tableau_calcs()
self._load_tableau_staples()

self._load_elastic_library()
self._load_elastic_employees()

self._load_kibana_sample(self.ECOMMERCE_INDEX)
self._load_kibana_sample(self.FLIGHTS_INDEX)
self._load_kibana_sample(self.LOGS_INDEX)

print("Data %s." % ("meta-processed" if self._mode == self.MODE_NOINDEX else "reindexed" if self._mode == \
self.MODE_REINDEX else "indexed"))

Expand Down
14 changes: 13 additions & 1 deletion test/integration/testing.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,16 @@ def _clear_cursor(self, index_name):
curs2.fetchall()
# no exception raised -> passed

def _select_columns(self, index_name, columns):
with pyodbc.connect(self._dsn) as cnxn:
cnxn.autocommit = True
stmt = "select %s from %s" % (columns, index_name)
with cnxn.execute(stmt) as curs:
cnt = 0
while curs.fetchone():
cnt += 1 # no exception -> success
print("Selected %s rows from %s." % (cnt, index_name))

def _current_user(self):
with pyodbc.connect(self._dsn) as cnxn:
cnxn.autocommit = True
Expand All @@ -87,12 +97,14 @@ def _current_user(self):
raise Exception("current username not 'elastic': %s" % user)

def perform(self):
self._current_user()
self._as_csv(TestData.LIBRARY_INDEX)
self._as_csv(TestData.EMPLOYEES_INDEX)
self._count_all(TestData.CALCS_INDEX)
self._count_all(TestData.STAPLES_INDEX)
self._clear_cursor(TestData.LIBRARY_INDEX)
self._current_user()
self._select_columns(TestData.FLIGHTS_INDEX, "*")
# TODO: add ecommerce and logs once #39700 is addressed

print("Tests successful.")

Expand Down

0 comments on commit 2eab310

Please sign in to comment.