Skip to content

Commit

Permalink
Merge pull request #113 from DataShades/py3
Browse files Browse the repository at this point in the history
Python3 support
  • Loading branch information
kowh-ai authored Oct 13, 2020
2 parents a0a9227 + 9566054 commit 3092d82
Show file tree
Hide file tree
Showing 28 changed files with 1,941 additions and 1,156 deletions.
54 changes: 38 additions & 16 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,22 +1,44 @@
language: python

# the new trusty images of Travis cause build errors with psycopg2, see https://github.com/travis-ci/travis-ci/issues/8897
dist: trusty
group: deprecated-2017Q4

python:
- "2.7"
env:
# - CKANVERSION=master
- CKANVERSION=2.8
- CKANVERSION=2.7
os: linux
language: python

install:
- bash bin/travis-build.bash
services:
- postgresql
- redis
addons:
postgresql: "9.3"
script: sh bin/travis-run.sh
after_success: coveralls
sudo: required
- postgresql

script: bash bin/travis-run.bash
before_install:
- pip install codecov
after_success:
- codecov

jobs:
include:
- stage: Flake8
python: 2.7
env: FLAKE8=True
install:
- pip install flake8==3.5.0
- pip install pycodestyle==2.3.0
script:
- flake8 --version
# stop the build if there are Python syntax errors or undefined names
- flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics --exclude ckan
# exit-zero treats all errors as warnings. The GitHub editor is 127 chars wide
# - flake8 . --count --max-line-length=127 --statistics --exclude ckan --exit-zero
- stage: Tests
python: "2.7"
env: CKANVERSION=master
- python: "3.6"
env: CKANVERSION=master
- python: "2.7"
env: CKANVERSION=2.8
- python: "2.7"
env: CKANVERSION=2.7

cache:
directories:
- $HOME/.cache/pip
9 changes: 1 addition & 8 deletions bin/travis-run.sh → bin/travis-run.bash
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,7 @@ flake8 --version
# stop the build if there are Python syntax errors or undefined names
flake8 . --count --select=E901,E999,F821,F822,F823 --show-source --statistics --exclude ckan,ckanext-xloader

nosetests --ckan \
--nologcapture \
--with-pylons=subdir/test.ini \
--with-coverage \
--cover-package=ckanext.xloader \
--cover-inclusive \
--cover-erase \
--cover-tests
pytest --ckan-ini subdir/test.ini --cov=ckanext.xloader ckanext/xloader/tests

# strict linting
flake8 . --count --max-complexity=27 --max-line-length=127 --statistics --exclude ckan,ckanext-xloader
17 changes: 13 additions & 4 deletions ckanext/xloader/action.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
# encoding: utf-8

from __future__ import absolute_import
from builtins import str
import logging
import json
import datetime

from dateutil.parser import parse as parse_date

import ckan.model as model
import ckan.lib.navl.dictization_functions
import ckan.logic as logic
import ckan.plugins as p
from ckan.logic import side_effect_free

import ckanext.xloader.schema
import interfaces as xloader_interfaces
import jobs
import db
from . import interfaces as xloader_interfaces
from . import jobs
from . import db
try:
enqueue_job = p.toolkit.enqueue_job
except AttributeError:
Expand Down Expand Up @@ -108,7 +111,7 @@ def xloader_submit(context, data_dict):
if existing_task.get('state') == 'pending':
import re # here because it takes a moment to load
queued_res_ids = [
re.search(r"'resource_id': u'([^']+)'",
re.search(r"'resource_id': u?'([^']+)'",
job.description).groups()[0]
for job in get_queue().get_jobs()
if 'xloader_to_datastore' in str(job) # filter out test_job etc
Expand Down Expand Up @@ -143,6 +146,10 @@ def xloader_submit(context, data_dict):

context['ignore_auth'] = True
context['user'] = '' # benign - needed for ckan 2.5

model = context['model']
original_session = model.Session
model.Session = model.meta.create_local_session()
p.toolkit.get_action('task_status_update')(context, task)

data = {
Expand All @@ -168,6 +175,7 @@ def xloader_submit(context, data_dict):
job = _enqueue(jobs.xloader_data_into_datastore, [data], timeout=timeout)
except Exception:
log.exception('Unable to enqueued xloader res_id=%s', res_id)
model.Session = original_session
return False
log.debug('Enqueued xloader job=%s res_id=%s', job.id, res_id)

Expand All @@ -177,6 +185,7 @@ def xloader_submit(context, data_dict):
task['state'] = 'pending'
task['last_updated'] = str(datetime.datetime.utcnow()),
p.toolkit.get_action('task_status_update')(context, task)
model.Session = original_session

return True

Expand Down
17 changes: 9 additions & 8 deletions ckanext/xloader/cli.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from __future__ import print_function
import sys
import logging

Expand Down Expand Up @@ -62,7 +63,7 @@ def __init__(self, name):

def command(self):
if not self.args:
print self.usage
print(self.usage)
sys.exit(1)
if self.args[0] == 'submit':
if len(self.args) < 2:
Expand Down Expand Up @@ -115,7 +116,7 @@ def _confirm_or_abort(self):
)
answer = cli.query_yes_no(question, default=None)
if not answer == 'yes':
print "Aborting..."
print("Aborting...")
sys.exit(0)

def _submit_all_existing(self):
Expand Down Expand Up @@ -281,23 +282,23 @@ def command(self):
def _migrate_all(self):
session = model.Session
resource_count = session.query(model.Resource).filter_by(state='active').count()
print "Updating {} resource(s)".format(resource_count)
print("Updating {} resource(s)".format(resource_count))
resources_done = 0
for resource in session.query(model.Resource).filter_by(state='active'):
resources_done += 1
self._migrate_resource(resource.id,
prefix='[{}/{}]: '.format(resources_done,
resource_count))
if resources_done % 100 == 0:
print "[{}/{}] done".format(resources_done, resource_count)
print "[{}/{}] done".format(resources_done, resource_count)
print("[{}/{}] done".format(resources_done, resource_count))
print("[{}/{}] done".format(resources_done, resource_count))

def _migrate_resource(self, resource_id, prefix=''):
data_dict = h.datastore_dictionary(resource_id)

def print_status(status):
if self.options.verbose:
print "{}{}: {}".format(prefix, resource_id, status)
print("{}{}: {}".format(prefix, resource_id, status))

if not data_dict:
print_status("not found")
Expand Down Expand Up @@ -333,9 +334,9 @@ def print_status(status):
'fields': fields
})
print_status("updated")
except Exception, e:
except Exception as e:
self.error_occured = True
print "{}: failed, {}".format(resource_id, e)
print("{}: failed, {}".format(resource_id, e))

def _handle_command_status(self):
if self.error_occured:
Expand Down
47 changes: 2 additions & 45 deletions ckanext/xloader/controllers.py
Original file line number Diff line number Diff line change
@@ -1,50 +1,7 @@
import ckan.plugins as p

_ = p.toolkit._
import ckanext.xloader.utils as utils


class ResourceDataController(p.toolkit.BaseController):

def resource_data(self, id, resource_id):

if p.toolkit.request.method == 'POST':
try:
p.toolkit.c.pkg_dict = \
p.toolkit.get_action('xloader_submit')(
None, {'resource_id': resource_id}
)
except p.toolkit.ValidationError:
pass

p.toolkit.redirect_to(
controller='ckanext.xloader.controllers:ResourceDataController',
action='resource_data',
id=id,
resource_id=resource_id
)

try:
p.toolkit.c.pkg_dict = p.toolkit.get_action('package_show')(
None, {'id': id}
)
p.toolkit.c.resource = p.toolkit.get_action('resource_show')(
None, {'id': resource_id}
)
except (p.toolkit.ObjectNotFound, p.toolkit.NotAuthorized):
p.toolkit.abort(404, _('Resource not found'))

try:
xloader_status = p.toolkit.get_action('xloader_status')(
None, {'resource_id': resource_id}
)
except p.toolkit.ObjectNotFound:
xloader_status = {}
except p.toolkit.NotAuthorized:
p.toolkit.abort(403, _('Not authorized to see this page'))

return p.toolkit.render('xloader/resource_data.html',
extra_vars={
'status': xloader_status,
'resource': p.toolkit.c.resource,
'pkg_dict': p.toolkit.c.pkg_dict,
})
return utils.resource_data(id, resource_id)
39 changes: 20 additions & 19 deletions ckanext/xloader/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import datetime
import json

import six
import sqlalchemy


Expand Down Expand Up @@ -108,7 +109,7 @@ def get_job(job_id):
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
if job_id:
job_id = unicode(job_id)
job_id = six.text_type(job_id)

result = ENGINE.execute(
JOBS_TABLE.select().where(JOBS_TABLE.c.job_id == job_id)).first()
Expand All @@ -118,7 +119,7 @@ def get_job(job_id):

# Turn the result into a dictionary representation of the job.
result_dict = {}
for field in result.keys():
for field in list(result.keys()):
value = getattr(result, field)
if value is None:
result_dict[field] = value
Expand All @@ -127,7 +128,7 @@ def get_job(job_id):
elif isinstance(value, datetime.datetime):
result_dict[field] = value.isoformat()
else:
result_dict[field] = unicode(value)
result_dict[field] = six.text_type(value)

result_dict['metadata'] = _get_metadata(job_id)
result_dict['logs'] = _get_logs(job_id)
Expand Down Expand Up @@ -178,14 +179,14 @@ def add_pending_job(job_id, job_type, api_key,
# Turn strings into unicode to stop SQLAlchemy
# "Unicode type received non-unicode bind param value" warnings.
if job_id:
job_id = unicode(job_id)
job_id = six.text_type(job_id)
if job_type:
job_type = unicode(job_type)
job_type = six.text_type(job_type)
if result_url:
result_url = unicode(result_url)
result_url = six.text_type(result_url)
if api_key:
api_key = unicode(api_key)
data = unicode(data)
api_key = six.text_type(api_key)
data = six.text_type(data)

if not metadata:
metadata = {}
Expand All @@ -205,16 +206,16 @@ def add_pending_job(job_id, job_type, api_key,
# Insert any (key, value) metadata pairs that the job has into the
# metadata table.
inserts = []
for key, value in metadata.items():
for key, value in list(metadata.items()):
type_ = 'string'
if not isinstance(value, basestring):
if not isinstance(value, six.string_types):
value = json.dumps(value)
type_ = 'json'

# Turn strings into unicode to stop SQLAlchemy
# "Unicode type received non-unicode bind param value" warnings.
key = unicode(key)
value = unicode(value)
key = six.text_type(key)
value = six.text_type(value)

inserts.append(
{"job_id": job_id,
Expand Down Expand Up @@ -261,12 +262,12 @@ def _validate_error(error):
"""
if error is None:
return None
elif isinstance(error, basestring):
elif isinstance(error, six.string_types):
return {"message": error}
else:
try:
message = error["message"]
if isinstance(message, basestring):
if isinstance(message, six.string_types):
return error
else:
raise InvalidErrorObjectError(
Expand All @@ -291,19 +292,19 @@ def _update_job(job_id, job_dict):
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
if job_id:
job_id = unicode(job_id)
job_id = six.text_type(job_id)

if "error" in job_dict:
job_dict["error"] = _validate_error(job_dict["error"])
job_dict["error"] = json.dumps(job_dict["error"])
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
job_dict["error"] = unicode(job_dict["error"])
job_dict["error"] = six.text_type(job_dict["error"])

# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
if "data" in job_dict:
job_dict["data"] = unicode(job_dict["data"])
job_dict["data"] = six.text_type(job_dict["data"])

ENGINE.execute(
JOBS_TABLE.update()
Expand Down Expand Up @@ -448,7 +449,7 @@ def _get_metadata(job_id):
"""Return any metadata for the given job_id from the metadata table."""
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
job_id = unicode(job_id)
job_id = six.text_type(job_id)

results = ENGINE.execute(
METADATA_TABLE.select().where(
Expand All @@ -466,7 +467,7 @@ def _get_logs(job_id):
"""Return any logs for the given job_id from the logs table."""
# Avoid SQLAlchemy "Unicode type received non-unicode bind param value"
# warnings.
job_id = unicode(job_id)
job_id = six.text_type(job_id)

results = ENGINE.execute(
LOGS_TABLE.select().where(LOGS_TABLE.c.job_id == job_id)).fetchall()
Expand Down
Loading

0 comments on commit 3092d82

Please sign in to comment.