Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL Support #69

Merged
merged 9 commits into from
Jan 30, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions rd_service/data/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import qr
import redis
import time
import query_runner
import worker
import settings
from utils import gen_query_hash


Expand Down Expand Up @@ -153,8 +153,13 @@ def run_query(self, *args):
def start_workers(self, workers_count, connection_string):
if self.workers:
return self.workers

runner = query_runner.redshift(connection_string)

if settings.CONNECTION_ADAPTER == "mysql":
import query_runner_mysql
runner = query_runner_mysql.mysql(connection_string)
else:
import query_runner
runner = query_runner.redshift(connection_string)

redis_connection_params = self.redis_connection.connection_pool.connection_kwargs
self.workers = [worker.Worker(self, redis_connection_params, runner)
Expand Down
22 changes: 11 additions & 11 deletions rd_service/data/query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,17 @@
Because the worker just pass the query, this can be used with any data store that has some sort of
query language (for example: HiveQL).
"""
import logging
import json
import psycopg2
import sys
import select
from .utils import JSONEncoder


def redshift(connection_string):
def column_friendly_name(column_name):
return column_name

def wait(conn):
while 1:
state = conn.poll()
Expand All @@ -28,24 +28,24 @@ def wait(conn):
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)

def query_runner(query):
connection = psycopg2.connect(connection_string, async=True)
wait(connection)

cursor = connection.cursor()

try:
cursor.execute(query)
wait(connection)

column_names = [col.name for col in cursor.description]

rows = [dict(zip(column_names, row)) for row in cursor]
columns = [{'name': col.name,
'friendly_name': column_friendly_name(col.name),
'type': None} for col in cursor.description]

data = {'columns': columns, 'rows': rows}
json_data = json.dumps(data, cls=JSONEncoder)
error = None
Expand All @@ -61,7 +61,7 @@ def query_runner(query):
raise sys.exc_info()[1], None, sys.exc_info()[2]
finally:
connection.close()

return json_data, error

return query_runner
return query_runner
56 changes: 56 additions & 0 deletions rd_service/data/query_runner_mysql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
"""
QueryRunner is the function that the workers use, to execute queries. This is the Redshift
(PostgreSQL in fact) version, but easily we can write another to support additional databases
(MySQL and others).

Because the worker just pass the query, this can be used with any data store that has some sort of
query language (for example: HiveQL).
"""
import logging
import json
import MySQLdb
import sys
import select
from .utils import JSONEncoder

def mysql(connection_string):
if connection_string.endswith(';'):
connection_string = connection_string[0:-1]

def query_runner(query):
connections_params = [entry.split('=')[1] for entry in connection_string.split(';')]
connection = MySQLdb.connect(*connections_params)
cursor = connection.cursor()

logging.debug("mysql got query: %s", query)

try:
cursor.execute(query)

data = cursor.fetchall()

num_fields = len(cursor.description)
column_names = [i[0] for i in cursor.description]

rows = [dict(zip(column_names, row)) for row in data]

columns = [{'name': col_name,
'friendly_name': col_name,
'type': None} for col_name in column_names]

data = {'columns': columns, 'rows': rows}
json_data = json.dumps(data, cls=JSONEncoder)
error = None
cursor.close()
except MySQLdb.Error, e:
json_data = None
error = e.message
except Exception as e:
raise sys.exc_info()[1], None, sys.exc_info()[2]
finally:
connection.close()

return json_data, error


return query_runner
5 changes: 5 additions & 0 deletions rd_service/settings_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import django.conf

REDIS_URL = "redis://localhost:6379"

# Either "pg" or "mysql"
CONNECTION_ADAPTER = "mysql"
# Connection string for the database that is used to run queries against
# -- example mysql CONNECTION_STRING = "Server=;User=;Pwd=;Database="
# -- example pg CONNECTION_STRING = "user= password= host= port=5439 dbname="
CONNECTION_STRING = "user= password= host= port=5439 dbname="
# Connection string for the operational databases (where we store the queries, results, etc)
INTERNAL_DB_CONNECTION_STRING = "dbname=postgres"
Expand Down