Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MySQL Support #69

Merged
merged 9 commits into from
Jan 30, 2014
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 8 additions & 3 deletions rd_service/data/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@
import qr
import redis
import time
import query_runner
import worker
import settings
from utils import gen_query_hash


Expand Down Expand Up @@ -153,8 +153,13 @@ def run_query(self, *args):
def start_workers(self, workers_count, connection_string):
if self.workers:
return self.workers

runner = query_runner.redshift(connection_string)

if settings.CONNECTION_ADAPTER == "mysql":
import query_runner_mysql
runner = query_runner_mysql.mysql(connection_string)
else:
import query_runner
runner = query_runner.redshift(connection_string)

self.workers = [worker.Worker(self, runner) for _ in range(workers_count)]
for w in self.workers:
Expand Down
63 changes: 32 additions & 31 deletions rd_service/data/query_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,62 +6,63 @@
Because the worker just pass the query, this can be used with any data store that has some sort of
query language (for example: HiveQL).
"""
import logging
import json
import psycopg2
import MySQLdb
import sys
import select
from .utils import JSONEncoder


def redshift(connection_string):
def mysql(connection_string):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You kept the mysql one here, but moved the redshift one to mysql_query_runner.py...

def column_friendly_name(column_name):
return column_name

def wait(conn):
while 1:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)

def query_runner(query):
connection = psycopg2.connect(connection_string, async=True)
wait(connection)

# TODO: remove these lines
import settings
connection_string = settings.CONNECTION_STRING
# ENDTODO


if connection_string.endswith(';'):
connection_string = connection_string[0:-1]

connections_params = [entry.split('=')[1] for entry in connection_string.split(';')]
connection = MySQLdb.connect(*connections_params)
cursor = connection.cursor()


logging.debug("mysql got query: %s", query)

try:
cursor.execute(query)
wait(connection)

data = cursor.fetchall()

column_names = [col.name for col in cursor.description]
num_fields = len(cursor.description)
column_names = [i[0] for i in cursor.description]

rows = [dict(zip(column_names, row)) for row in data]

rows = [dict(zip(column_names, row)) for row in cursor]
columns = [{'name': col.name,
'friendly_name': column_friendly_name(col.name),
'type': None} for col in cursor.description]
columns = [{'name': col_name,
'friendly_name': column_friendly_name(col_name),
'type': None} for col_name in column_names]

data = {'columns': columns, 'rows': rows}
json_data = json.dumps(data, cls=JSONEncoder)
error = None
cursor.close()
except psycopg2.DatabaseError as e:
except MySQLdb.Error, e:
json_data = None
error = e.message
except KeyboardInterrupt:
connection.cancel()
error = "Query cancelled by user."
json_data = None
except Exception as e:
raise sys.exc_info()[1], None, sys.exc_info()[2]
finally:
connection.close()

return json_data, error



return query_runner



125 changes: 125 additions & 0 deletions rd_service/data/query_runner_mysql.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
"""
QueryRunner is the function that the workers use, to execute queries. This is the Redshift
(PostgreSQL in fact) version, but easily we can write another to support additional databases
(MySQL and others).

Because the worker just pass the query, this can be used with any data store that has some sort of
query language (for example: HiveQL).
"""
import logging
import json
import psycopg2
import MySQLdb
import sys
import select
from .utils import JSONEncoder

def mysql(connection_string):
def column_friendly_name(column_name):
return column_name

def query_runner(query):
# TODO: remove these lines
import settings
connection_string = settings.CONNECTION_STRING
# ENDTODO


if connection_string.endswith(';'):
connection_string = connection_string[0:-1]

connections_params = [entry.split('=')[1] for entry in connection_string.split(';')]
connection = MySQLdb.connect(*connections_params)
cursor = connection.cursor()

logging.debug("mysql got query: %s", query)

try:
cursor.execute(query)

data = cursor.fetchall()

num_fields = len(cursor.description)
column_names = [i[0] for i in cursor.description]

for c in data:
logging.debug(c)

rows = [dict(zip(column_names, row)) for row in data]


columns = [{'name': col_name,
'friendly_name': column_friendly_name(col_name),
'type': None} for col_name in column_names]

data = {'columns': columns, 'rows': rows}
json_data = json.dumps(data, cls=JSONEncoder)
error = None
cursor.close()
except MySQLdb.Error, e:
json_data = None
error = e.message
except Exception as e:
raise sys.exc_info()[1], None, sys.exc_info()[2]
finally:
connection.close()

return json_data, error


return query_runner



def redshift(connection_string):
def column_friendly_name(column_name):
return column_name

def wait(conn):
while 1:
state = conn.poll()
if state == psycopg2.extensions.POLL_OK:
break
elif state == psycopg2.extensions.POLL_WRITE:
select.select([], [conn.fileno()], [])
elif state == psycopg2.extensions.POLL_READ:
select.select([conn.fileno()], [], [])
else:
raise psycopg2.OperationalError("poll() returned %s" % state)

def query_runner(query):
connection = psycopg2.connect(connection_string, async=True)
wait(connection)

cursor = connection.cursor()

try:
cursor.execute(query)
wait(connection)

column_names = [col.name for col in cursor.description]

rows = [dict(zip(column_names, row)) for row in cursor]
columns = [{'name': col.name,
'friendly_name': column_friendly_name(col.name),
'type': None} for col in cursor.description]

data = {'columns': columns, 'rows': rows}
json_data = json.dumps(data, cls=JSONEncoder)
error = None
cursor.close()
except psycopg2.DatabaseError as e:
json_data = None
error = e.message
except KeyboardInterrupt:
connection.cancel()
error = "Query cancelled by user."
json_data = None
except Exception as e:
raise sys.exc_info()[1], None, sys.exc_info()[2]
finally:
connection.close()

return json_data, error

return query_runner
5 changes: 5 additions & 0 deletions rd_service/settings_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,12 @@
import django.conf

REDIS_URL = "redis://localhost:6379"

# Either "pg" or "mysql"
CONNECTION_ADAPTER = "mysql"
# Connection string for the database that is used to run queries against
# -- example mysql CONNECTION_STRING = "Server=;User=;Pwd=;Database="
# -- example pg CONNECTION_STRING = "user= password= host= port=5439 dbname="
CONNECTION_STRING = "user= password= host= port=5439 dbname="
# Connection string for the operational databases (where we store the queries, results, etc)
INTERNAL_DB_CONNECTION_STRING = "dbname=postgres"
Expand Down