Skip to content

Commit

Permalink
feat: support opendistro (#45)
Browse files Browse the repository at this point in the history
* feat: support opendistro

* support empty indices

* refactor

* regorg and prepare for opendistro tests

* Tests for opendistro

* Tests for opendistro

* more opendistro

* tests pass locally

* test opendistro dialect

* lint

* update README

* implement a SQLAlchemy do_ping

* remove unnecessary do_ping

* better type annotations

* adds mypy has a dev requirement

* Update es/basesqlalchemy.py

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>

* Update es/basesqlalchemy.py

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>

* Update es/basesqlalchemy.py

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>

* Update es/basesqlalchemy.py

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>

* Update es/basesqlalchemy.py

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>

* Update es/basesqlalchemy.py

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>

* Update es/opendistro/api.py

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>

* Update es/basesqlalchemy.py

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>

* use namedtuple for better readability

* Update es/basesqlalchemy.py

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>

* fix type annotations

* lint

* update README ref experimental engine

Co-authored-by: Ville Brofeldt <33317356+villebro@users.noreply.github.com>
  • Loading branch information
dpgaspar and villebro authored Jan 21, 2021
1 parent 698006e commit f0b013e
Show file tree
Hide file tree
Showing 14 changed files with 394 additions and 219 deletions.
23 changes: 21 additions & 2 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,19 +31,28 @@ jobs:
run: black --check setup.py es
- name: flake8
run: flake8 es
- name: mypy
run: mypy es

tests:
runs-on: ubuntu-18.04
strategy:
matrix:
python-version: [3.6, 3.7, 3.8]
services:
postgres:
elasticsearch:
image: elasticsearch:7.3.2
env:
discovery.type: single-node
ports:
- 9200:9200
opendistro:
image: amazon/opendistro-for-elasticsearch:1.12.0
env:
discovery.type: single-node
ports:
- 9400:9200

steps:
- uses: actions/checkout@v2
- name: Setup Python
Expand All @@ -58,8 +67,18 @@ jobs:
pip install -r requirements.txt
pip install -r requirements-dev.txt
pip install -e .
- name: Run tests
- name: Run tests on Elasticsearch
run: |
export ES_URI="http://localhost:9200"
nosetests -v --with-coverage --cover-package=es es.tests
- name: Run tests on Opendistro
run: |
export ES_DRIVER=odelasticsearch
export ES_URI="https://admin:admin@localhost:9400"
export ES_PASSWORD=admin
export ES_PORT=9400
export ES_SCHEME=https
export ES_USER=admin
nosetests -v --with-coverage --cover-package=es es.tests
- name: Upload code coverage
run: |
Expand Down
18 changes: 13 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,12 @@

`elasticsearch-dbapi` Implements a DBAPI (PEP-249) and SQLAlchemy dialect,
that enables SQL access on elasticsearch clusters for query only access.

On Elastic Elasticsearch:
Uses Elastic X-Pack [SQL API](https://www.elastic.co/guide/en/elasticsearch/reference/current/xpack-sql.html)

We are currently building support for `opendistro/_sql` API for AWS Elasticsearch Service / [Open Distro SQL](https://opendistro.github.io/for-elasticsearch-docs/docs/sql/)
On AWS ES, opendistro Elasticsearch:
[Open Distro SQL](https://opendistro.github.io/for-elasticsearch-docs/docs/sql/)

This library supports Elasticsearch 7.X versions.

Expand All @@ -23,7 +26,7 @@ $ pip install elasticsearch-dbapi
To install support for AWS Elasticsearch Service / [Open Distro](https://opendistro.github.io/for-elasticsearch/features/SQL%20Support.html):

```bash
$ pip install elasticsearch-dbapi[aws]
$ pip install elasticsearch-dbapi[opendistro]
```

### Usage:
Expand Down Expand Up @@ -131,8 +134,7 @@ $ nosetests -v
### Special case for sql opendistro endpoint (AWS ES)

AWS ES exposes the opendistro SQL plugin, and it follows a different SQL dialect.
Because of dialect and API response differences, we provide limited support for opendistro SQL
on this package using the `odelasticsearch` driver:
Using the `odelasticsearch` driver:

```python
from sqlalchemy.engine import create_engine
Expand All @@ -159,6 +161,9 @@ curs = conn.cursor().execute(
print([row for row in curs])
```

To connect to the provided Opendistro ES on `docker-compose` use the following URI:
`odelasticsearch+https://admin:admin@localhost:9400/?verify_certs=False`

### Known limitations

This library does not yet support the following features:
Expand All @@ -168,4 +173,7 @@ SQLAlchemy `get_columns` will exclude them.
- `object` and `nested` column types are not well supported and are converted to strings
- Indexes that whose name start with `.`
- GEO points are not currently well-supported and are converted to strings
- Very limited support for AWS ES, no AWS Auth yet for example

- AWS ES (opendistro elascticsearch) is supported (still beta), known limitations are:
* You are only able to `GROUP BY` keyword fields (new [experimental](https://github.com/opendistro-for-elasticsearch/sql#experimental)
opendistro SQL already supports it)
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ services:
- 9300:9300

opendistro:
image: amazon/opendistro-for-elasticsearch:1.7.0
image: amazon/opendistro-for-elasticsearch:1.12.0
env_file: .env
ports:
- 9400:9200
Expand Down
95 changes: 79 additions & 16 deletions es/baseapi.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
from collections import namedtuple
from typing import Dict, List, Optional, Tuple

from elasticsearch import exceptions as es_exceptions
from es import exceptions
from six import string_types
Expand All @@ -7,6 +10,14 @@
from .const import DEFAULT_FETCH_SIZE, DEFAULT_SCHEMA, DEFAULT_SQL_PATH


CursorDescriptionRow = namedtuple(
"CursorDescriptionRow",
["name", "type", "display_size", "internal_size", "precision", "scale", "null_ok"],
)

CursorDescriptionType = List[CursorDescriptionRow]


class Type(object):
STRING = 1
NUMBER = 2
Expand All @@ -17,25 +28,80 @@ class Type(object):
def check_closed(f):
"""Decorator that checks if connection/cursor is closed."""

def g(self, *args, **kwargs):
def wrap(self, *args, **kwargs):
if self.closed:
raise exceptions.Error(
"{klass} already closed".format(klass=self.__class__.__name__)
)
return f(self, *args, **kwargs)

return g
return wrap


def check_result(f):
"""Decorator that checks if the cursor has results from `execute`."""

def g(self, *args, **kwargs):
def wrap(self, *args, **kwargs):
if self._results is None:
raise exceptions.Error("Called before `execute`")
return f(self, *args, **kwargs)

return g
return wrap


def get_type(data_type):
type_map = {
"text": Type.STRING,
"keyword": Type.STRING,
"integer": Type.NUMBER,
"half_float": Type.NUMBER,
"scaled_float": Type.NUMBER,
"geo_point": Type.STRING,
# TODO get a solution for nested type
"nested": Type.STRING,
"object": Type.STRING,
"date": Type.DATETIME,
"datetime": Type.DATETIME,
"short": Type.NUMBER,
"long": Type.NUMBER,
"float": Type.NUMBER,
"double": Type.NUMBER,
"bytes": Type.NUMBER,
"boolean": Type.BOOLEAN,
"ip": Type.STRING,
"interval_minute_to_second": Type.STRING,
"interval_hour_to_second": Type.STRING,
"interval_hour_to_minute": Type.STRING,
"interval_day_to_second": Type.STRING,
"interval_day_to_minute": Type.STRING,
"interval_day_to_hour": Type.STRING,
"interval_year_to_month": Type.STRING,
"interval_second": Type.STRING,
"interval_minute": Type.STRING,
"interval_day": Type.STRING,
"interval_month": Type.STRING,
"interval_year": Type.STRING,
}
return type_map[data_type.lower()]


def get_description_from_columns(
columns: List[Dict[str, str]]
) -> CursorDescriptionType:
return [
(
CursorDescriptionRow(
column.get("name") if "alias" not in column else column.get("alias"),
get_type(column.get("type")),
None, # [display_size]
None, # [internal_size]
None, # [precision]
None, # [scale]
True, # [null_ok]
)
)
for column in columns
]


class BaseConnection(object):
Expand Down Expand Up @@ -120,19 +186,19 @@ def __init__(self, url, es, **kwargs):
# this is set to an iterator after a successfull query
self._results = None

@property
@property # type: ignore
@check_result
@check_closed
def rowcount(self):
def rowcount(self) -> int:
return len(self._results)

@check_closed
def close(self):
def close(self) -> None:
"""Close the cursor."""
self.closed = True

@check_closed
def execute(self, operation, parameters=None):
def execute(self, operation, parameters=None) -> "BaseCursor":
raise NotImplementedError # pragma: no cover

@check_closed
Expand All @@ -143,7 +209,7 @@ def executemany(self, operation, seq_of_parameters=None):

@check_result
@check_closed
def fetchone(self):
def fetchone(self) -> Optional[Tuple[str]]:
"""
Fetch the next row of a query result set, returning a single sequence,
or `None` when no more data is available.
Expand All @@ -155,7 +221,7 @@ def fetchone(self):

@check_result
@check_closed
def fetchmany(self, size=None):
def fetchmany(self, size: Optional[int] = None) -> List[Tuple[str]]:
"""
Fetch the next set of rows of a query result, returning a sequence of
sequences (e.g. a list of tuples). An empty sequence is returned when
Expand All @@ -167,7 +233,7 @@ def fetchmany(self, size=None):

@check_result
@check_closed
def fetchall(self):
def fetchall(self) -> List[Tuple[str]]:
"""
Fetch all (remaining) rows of a query result, returning them as a
sequence of sequences (e.g. a list of tuples). Note that the cursor's
Expand Down Expand Up @@ -202,18 +268,15 @@ def sanitize_query(self, query):
# remove dummy schema from queries
return query.replace(f'FROM "{DEFAULT_SCHEMA}".', "FROM ")

def elastic_query(self, query: str, csv=False):
def elastic_query(self, query: str):
"""
Request an http SQL query to elasticsearch
"""
self.description = None
# Sanitize query
query = self.sanitize_query(query)
payload = {"query": query, "fetch_size": self.fetch_size}
if csv:
path = f"/{self.sql_path}/?format=csv"
else:
path = f"/{self.sql_path}/"
path = f"/{self.sql_path}/"
try:
resp = self.es.transport.perform_request("POST", path, body=payload)
except es_exceptions.ConnectionError as e:
Expand Down
Loading

0 comments on commit f0b013e

Please sign in to comment.