Skip to content

Commit

Permalink
Add schema validator, closes #8.
Browse files Browse the repository at this point in the history
  • Loading branch information
pwalsh committed Jan 28, 2015
1 parent 974c89c commit f959b8e
Show file tree
Hide file tree
Showing 17 changed files with 1,182 additions and 542 deletions.
1 change: 1 addition & 0 deletions requirements/test.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
coverage
pytest
pytest-cov
git+https://github.com/mverteuil/pytest-ipdb.git
137 changes: 79 additions & 58 deletions tabular_validator/pipeline/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,13 @@
from __future__ import print_function
from __future__ import unicode_literals

import os
import io
import importlib
from ..validators import SpecValidator
from .. import utilities
import shutil
import tempfile
import json
from ..utilities import data_table, data_package, csv_dialect, helpers
from .. import exceptions


Expand All @@ -15,24 +19,23 @@ class ValidationPipeline(object):
"""Validate a (tabular) data source through a validation pipeline.
Args:
validators: A list of validator names to process `data_source`
* Each name can be a 'shortname' for the default validators
* e.g., ['structure', 'schema', 'probe']
* Each name can be a string path to a validator
* e.g., ['custompackage.CustomValidator', 'schema']
* Custom validator must implement the Validator API
data_package: A stream, filepath, string or URL for a Data Package spec
data_source: A buffer, filepath, string or URL to the table data
data_format: The format of `data_source`. 'csv' or 'json'
table_schema: A buffer, filepath, string or URL for a JSON Table Schema
csv_dialect: A buffer, filepath, string or URL for a CSV dialect spec
options: a dict configuration object for the validation pipeline
* Each validator has its options nested under its 'shortname'
* Custom validators have options nested under a lower case string
of the Class name
* e.g.:
{'structure': {#options}, 'customvalidator': {#options}}
* validators: A list of validator names to process `data_source`
* Each name can be a 'shortname' for the default validators
* e.g., ['structure', 'tableschema']
* Each name can be a string path to a validator
* e.g., ['custompackage.CustomValidator', 'schema']
* Custom validator must implement the Validator API
* data_package_source: A stream, filepath, string or URL to a Data Package
* data_source: A buffer, filepath, string or URL to the table data
* data_format: The format of `data_source`. 'csv' or 'json'
* csv_dialect_source: A buffer, filepath, string or URL to a CSV dialect spec
* options: a dict configuration object for the validation pipeline
* Each validator has its options nested under its 'shortname'
* Custom validators have options nested under cls.__name__.lower()
* e.g.:
{'structure': {#options}, 'customvalidator': {#options}}
* workspace: path to directory for files. e.g.: '/my/path'
* dry_run: No files are persisted after the run has been completed
Returns:
A tuple of `valid, report`, where `valid` is a boolean expressing
Expand All @@ -42,41 +45,56 @@ class ValidationPipeline(object):
"""

def __init__(self, validators=None, data_package=None, data_source=None,
data_format='csv', table_schema=None, csv_dialect=None,
options=None, job_id=None, workspace=None, dry_run=None):

# TODO: Handle data_format (CSV, JSON)
# TODO: Pass csv_dialect to the table constructor
# TODO: Handle data_package and everything that means
# TODO: Handle cases where validators arguments are not valid
# TODO: Ensure that options looks legit

# TODO: Support job_id
# TODO: Support workspace
# TODO: Support dry_run
def __init__(self, validators=None, data_source=None,
data_package_source=None, csv_dialect_source=None,
data_format='csv', options=None,
workspace=None, dry_run=None):

self.validators = validators
self.data_package = data_package
self.data_source = data_source
self.data_format = data_format
self.table_schema = table_schema
self.csv_dialect = csv_dialect
self.options = options or {}
self.workspace = workspace or tempfile.mkdtemp()
self.dry_run = dry_run
self.openfiles = []

# Check that any/all spec files are validly formed
valid = self.validate_spec()
if not valid:
raise exceptions.InvalidSpec
# data package source
if data_package_source is not None:
_valid, self.data_package = data_package.validate(
data_package_source)
if not _valid:
raise exceptions.InvalidSpec
else:
if not self.dry_run:
self.write_file(json.dumps(self.data_package),
'data_package.json')
else:
self.data_package = None

# csv dialect source
if csv_dialect_source is not None:
_valid, self.csv_dialect = csv_dialect.validate(
csv_dialect_source)
if not _valid:
raise exceptions.InvalidSpec
else:
if not self.dry_run:
self.write_file(json.dumps(self.csv_dialect),
'csv_dialect.json')
else:
self.csv_dialect = None

self.table = utilities.DataTable(data_source)
# data source
self.table = data_table.DataTable(data_source)
self.openfiles.extend(self.table.openfiles)
self.report = {}

self.builtins = utilities.builtin_validators()
# container for validator reports
# TODO: Reimplement this as a 'meta' reporter.Report instance
# i.e.: it should be an interface over yaml or sql backend, not dict
self.report = {}

# instantiate all the validators in the pipeline with options.
self.builtins = helpers.builtin_validators()
if validators:
self.pipeline = []
for v in validators:
Expand All @@ -85,27 +103,22 @@ def __init__(self, validators=None, data_package=None, data_source=None,
self.pipeline.append(validator_class(**options))
else:
self.pipeline = [self.builtins[v]() for v in
utilities.DEFAULT_PIPELINE]
helpers.DEFAULT_PIPELINE]

def validate_spec(self):
"""Validate any/all spec files."""
specs = [self.data_package, self.table_schema, self.csv_dialect]
if any(specs):
sv = SpecValidator(data_package=self.data_package,
table_schema=self.table_schema,
csv_dialect=self.csv_dialect)
return sv.run()
def write_file(self, data, name):
"""Write a file to the pipeline workspace."""

return True
filepath = os.path.join(self.workspace, name)
with io.open(filepath, mode='w+t',encoding='utf-8') as destfile:
destfile.write(data)

def resolve_validator(self, validator_name):
"""Return a validator class."""

if validator_name in self.builtins:
validator_class = self.builtins[validator_name]

else:
# a custom validator
# resolve a custom validator
_module, _class = validator_name.rsplit('.', 1)
try:
validator_class = getattr(importlib.import_module(_module),
Expand All @@ -125,18 +138,17 @@ def register_validator(self, validator_name, options=None, position=None):

if position is None:
self.pipeline.append(validator)

else:
self.pipeline.insert(position, validator)

def run(self):
"""Run the validation pipeline."""

# The valid state of the run
# default valid state
valid = True

def _run_valid(process_valid, run_valid):
"""Set/maintain the valid state of the run."""
"""Set/maintain the valid state of this run."""
if not process_valid and run_valid:
return False
return run_valid
Expand Down Expand Up @@ -185,8 +197,17 @@ def _run_valid(process_valid, run_valid):
if not _valid and validator.fail_fast:
return valid, self.generate_report()

# `dry_run` tasks
if self.dry_run:
self.rm_workspace()

return valid, self.generate_report()

def rm_workspace(self):
"""Remove this run's workspace from disk."""

return shutil.rmtree(self.workspace)

def generate_report(self):
"""Run the report generator for each validator in the pipeline."""

Expand Down
12 changes: 7 additions & 5 deletions tabular_validator/utilities/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
from __future__ import print_function
from __future__ import unicode_literals

from .table import DataTable
from .helpers import (builtin_validators, DEFAULT_PIPELINE, report_schema,
load_json_source)
from . import data_table
from . import table_schema
from . import csv_dialect
from . import data_package
from . import helpers


__all__ = ['DataTable', 'builtin_validators', 'DEFAULT_PIPELINE',
'report_schema', 'load_json_source']
__all__ = ['data_table', 'table_schema', 'csv_dialect', 'data_package',
'helpers']
30 changes: 30 additions & 0 deletions tabular_validator/utilities/csv_dialect.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import os
import io
import json
import jsonschema
from . import helpers


def validate(source):
"""Validate a CSV Dialect source file."""

schemafile = os.path.abspath(os.path.join(
os.path.dirname(os.path.dirname(__file__)), 'schemas',
'csv-dialect-description-format.json'))

with io.open(schemafile) as stream:
schema = json.load(stream)

try:
source = helpers.load_json_source(source)
jsonschema.validate(source, schema)
return True

except (jsonschema.ValidationError, ValueError, TypeError):
return False
34 changes: 34 additions & 0 deletions tabular_validator/utilities/data_package.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# -*- coding: utf-8 -*-
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
from __future__ import unicode_literals

import os
import io
import json
import jsonschema
from . import helpers


def validate(source):
"""Validate a Data Package source file.
Args:
* source
"""

schemafile = os.path.abspath(os.path.join(
os.path.dirname(os.path.dirname(__file__)), 'schemas',
'data-package.json'))

with io.open(schemafile) as stream:
schema = json.load(stream)

try:
source = helpers.load_json_source(source)
jsonschema.validate(source, schema)
return True, schema

except (jsonschema.ValidationError, ValueError, TypeError):
return False, None
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,19 @@ class DataTable(object):

REMOTE_SCHEMES = ('http', 'https', 'ftp', 'ftps')

def __init__(self, data_source, headers=None):
def __init__(self, data_source, headers=None, filepath=None):

self.openfiles = []
self.filepath = filepath
self.stream = self.to_textstream(data_source)
self.headers, self.values = self.extract(headers)

def extract(self, headers=None):
"""Extract headers and values from the data stream."""

# TODO: Support headers at any index, with any delimiter and associated
headers = headers or self.get_headers(self.stream.readline())
# TODO: json, accept hints for start of stream data, etc.
values = csv.reader(self.stream)
# reset the stream for others to possibly consume
self.stream.seek(0)
values = csv.reader(self.stream, quotechar="'")

return headers, values

def to_dict(self):
Expand Down
15 changes: 7 additions & 8 deletions tabular_validator/utilities/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import os
import io
import json
import codecs
import requests
from .. import compat

Expand All @@ -16,9 +15,8 @@ def builtin_validators():
"""Return dict of public builtin validators. Avoids circular import."""
from .. import validators
return {
'spec': validators.SpecValidator,
'structure': validators.StructureValidator,
'schema': validators.SchemaValidator
validators.StructureValidator.name: validators.StructureValidator,
validators.TableSchemaValidator.name: validators.TableSchemaValidator
}


Expand All @@ -31,17 +29,16 @@ def builtin_validators():
'name': {'type': compat.str},
'category': {'type': compat.str},
'level': {'type': compat.str},
'position': {'type': int},
'position': {'type': (int, type(None))},
'message': {'type': compat.str}
}


def load_json_source(source):

"""Load a source, expected to be JSON, into a Python data structure."""
"""Load a JSON source, from string, URL or buffer, into a Python type."""

if source is None:
# consider raising instead of returning None
return None

elif isinstance(source, (dict, list)):
Expand All @@ -55,4 +52,6 @@ def load_json_source(source):
return json.loads(source)

else:
return json.load(io.open(source, encoding='utf-8'))
with io.open(source, encoding='utf-8') as stream:
source = json.load(io.open(source, encoding='utf-8'))
return source
Loading

0 comments on commit f959b8e

Please sign in to comment.