A tool to validate data in Spark
Retrieving official releases via direct download or Maven-compatible dependency retrieval, e.g. spark-submit
You can make the jars available in one of two ways for the example run invocations below:
-
Get the latest version from GitHub Packages for the project. Place the jars somewhere and pass their path to
--jars
when runningspark-submit
. -
You can pull in the dependency using
spark-submit
's--repositories
,--packages
, and--mainClass
options, but it requires settingspark.jars.ivySettings
and providing this file, populated with a valid personal access token having theread:packages
scope enabled. N.b. it can be a challenge to secure this file on shared clusters; consider using a public GitHub service account instead of a token from your own personal GitHub account.<ivysettings> <settings defaultResolver="thechain"> <credentials host="maven.pkg.github.com" realm="GitHub Package Registry" username="${GITHUB_PACKAGES_USER}" passwd="${GITHUB_PACKAGES_USER_TOKEN}" /> </settings> <resolvers> <chain name="thechain"> <ibiblio name="central" m2compatible="true" root="https://repo1.maven.org/maven2" /> <!-- add any other repositories here --> <ibiblio name="ghp-dv" m2compatible="true" root="https://maven.pkg.github.com/target/data-validator"/> </chain> </resolvers> </ivysettings>
See also How do I add a GitHub Package repository when executing spark-submit --repositories?
See CONTRIBUTING for development environment setup.
Assemble fat jar: make build
or sbt clean assembly
spark-submit --master local data-validator-assembly-0.14.1.jar --help
data-validator v0.14.1
Usage: data-validator [options]
--version
--verbose Print additional debug output.
--config <value> required validator config .yaml filename, prefix w/ 'classpath:' to load configuration from JVM classpath/resources, ex. '--config classpath:/config.yaml'
--jsonReport <value> optional JSON report filename
--htmlReport <value> optional HTML report filename
--vars k1=v1,k2=v2... other arguments
--exitErrorOnFail true|false
optional when true, if validator fails, call System.exit(-1) Defaults to True, but will change to False in future version.
--emailOnPass true|false
optional when true, sends email on validation success. Default: false
--help Show this help message and exit.
If you want to build with Java 11 or newer, set the "MODERN_JAVA" environment variable. This may become the default in the future.
With the JAR directly:
spark-submit \
--num-executors 10 \
--executor-cores 2 \
data-validator-assembly-0.14.1.jar \
--config config.yaml \
--jsonReport report.json
Using packages loading, having created dv-ivy.xml
as suggested above
and having replaced the placeholders in the example:
touch empty.file && \
spark-submit \
--class com.target.data_validator.Main \
--packages com.target:data-validator_2.11:0.14.1 \
--conf spark.jars.ivySettings=$(pwd)/dv-ivy.xml \
empty.file \
--config config.yaml \
--jsonReport report.json
See the Example Config below for the contents of config.yaml
.
The data-validator config file is yaml based and it has 3 sections,
Global Settings, Table Sources, and Validators. The Table Sources,
and Validators have the ability to use variables in the
configuration. These variables are replaced at runtime with the values
set via Global Settings
section or the --vars
option on the
command line. Variables start with $
and must contain a word
starting with a letter (A-Za-z) and followed by zero or more letters
(A-Za-z), numbers(0-9), or underscore. Variables can optionally be
wrapped in {
}
. i.e. $foo
, ${foo}
See the
code
for the regular expression used to find them in a string. All the
table sources, and all but one validator (rowCount
) supports
variables in their configuration parameters. Note: Care must be taken
for some of the substitutions, some possible values might require
quoting the variables in the config.
The first section is the global settings that are used throughout the program.
Variable | Type | Required | Description |
---|---|---|---|
numKeyCols |
Int | Yes | The number of columns from the table schema to use to uniquely identify a row in the table. |
numErrorsToReport |
Int | Yes | The number of detailed errors to include in Validator Report. |
detailedErrors |
Boolean | Yes | If a check fails, run a second pass and gather numErrorToReport examples of failure. |
email |
EmailConfig | No | See Email Config. |
vars |
Map | No | A map of (key, value) pairs used for variable substitution in tables config. See next section. |
outputs |
Array | No | Describes where to send .json report. See Validator Output. |
tables |
List | Yes | List of table sources used to load tables to validate. |
Variable | Type | Required | Description |
---|---|---|---|
smtpHost |
String | Yes | The smtp host to send email message through. |
subject |
String | Yes | Subject for email message. |
from |
String | Yes | Email address to appear in from part of message. |
to |
Array[String] | Yes | Must specify at least one email address to send the email report to. |
cc |
Array[String] | No | Optional list of email addresses to send message to via cc field in message. |
bcc |
Array[String] | No | Optional list of email addresses to send message to via bcc field in message. |
Note that Data Validator only sends email on failure by default. To send email even on successful runs,
pass --emailOnPass true
to the command line.
There are 4 different types of variables that you can specify, simple, environment, shell and SQL.
Simple variables are specified by the name
and value
pairs and are very straight forward.
vars:
- name: ENV
value: prod
This sets the variable ENV
to the value prod
Environment variables import the value from the operating system
vars:
- name: JAVA_DIR
env: JAVA_HOME
This will set the variable JAVA_DIR
to the value returned by the System.getenv("JAVA_HOME")
If JAVA_HOME
does not exist in the system environment, the data-validator will stop processing and exit with an error.
Shell variable will take the first line of output from a shell command and store it a variable.
vars:
- name: NEXT_SATURDAY
shell: date -d "next saturday" +"%Y-%m-%d"
This will set the variable NEXT_SATURDAY
to the first line of output from the shell command date -d "next saturday" +"%Y-%m-%d"
.
SQL variable will take the first column from the first row of the results from a Spark SQL statement.
vars:
- name: MAX_AGE
sql: select max(age) from census_income.adult
This runs the sql command that gets the max value from the column age
from the table adult
in the census_income
database and stores it in MAX_AGE
.
In addition to the --jsonReport
command line option, the .yaml
has a outputs
section that directs the .json event report to a file or pipes it to a program. There is no current limit on the number of outputs.
outputs:
- filename: /user/home/sample.json
append: true
If the filename
specified begins with a /
or local:///
it is written to the local filesystem. If the filename begins with hdfs://
the report is written to the hdfs path. An optional append
boolean can be specified, and if it is true
the current report will be appended to the end of the specified file. The default is append: false
and the filename is overwritten. The filename
supports variable substitution, the optional append
does not. Before the validator starts processing tables, it checks to verify that it can create or append to the filename
, if it cannot, the data validator will exit with an error (non-zero value).
outputs:
- pipe: /path/to/program
ignoreError: true
A pipe
is used to send the .json
event report to another program for processing. This is a very powerful feature, and can enable the data-validator to be integrated with virtually any other system. An optional ignoreError
boolean can also be specified, if true
the exit value of the program will be ignored. If false
(default) and the program exits with a non-zero status, the data-validator will fail. The pipe
supports variable substitution, the optional ignoreError
does not.
Before the validator starts processing tables, it checks to see if the pipe
program is executable, if it is not, the data-validator will exit with an error (non-zero value). The program must be on a local filesystem to be executed.
Table sources are used to specify how to load the tables to be
validated. Currently supported sources are HiveTable, and
OrcFile. Each table source has 3 common arguments, keyColumns
, condition
,
checks
, and its own source specific argument(s). The keyColumns
are list of columns that can be used to uniquely identify a row in the
table for the detailed error report when a validator fails. The condition
enables the user to specify a snippet of sql to pass to the where clause.
The checks
argument is a list of validators to run on this table.
To validate a Hive table, specify the db
and the table
, see below.
- db: $DB
table: table_name
condition: "col1 < 100"
keyColumns:
- col1
- col2
checks:
To validate an .orc
file, specify orcFile
and the path to the file, see below.
- orcFile: /path/to/orc/file
keyColumns:
- col1
- col2
checks:
To validate an .parquet
file, specify parquetFile
and the path to the file, see below.
- parquetFile: /path/to/parquet/file
keyColumns:
- col1
- col2
checks:
To validate data loadable by the Spark DataFrameReader Fluent API, use something like this:
# Some systems require a special format
format: llama
# You can also pass any valid options
options:
maxMemory: 8G
# This is a string passed to the varargs version of DataFrameReader.load(String*)
# If omitted, then DV will call DataFrameReader.load() without parameters.
# The DataSource that Spark loads is expected to know how to handle this.
loadData:
- /path/to/something/camelid.llama
keyColumns:
- col1
- col2
condition: "col1 < 100"
checks:
Under the hood the above would be like loaded a DataFrame with:
spark.read
.format("llama")
.option("maxMemory", "8G")
.load("/path/to/something/camelid.llama")
The third section are the validators. To specify a validator, you first specify the type as one of the validators, then specify the arguments for that validator. Some of the validators support an error threshold. This options allows the user to specify the number of errors or percentage of errors they can tolerate. In some use cases, it might not be possible to eliminate all errors in the data.
Thresholds can be specified as an absolute number of errors, or a percentage of the row count.
If the threshold is >= 1
it is considered an absolute number of errors. For example 1000
would fail the check if there are more then 1000 rows that failed the check.
If the threshold is < 1
it is considered a fraction of the row count. For example 0.25
would fail the check if more then rowCount * 0.25
of the rows fail the check.
If the threshold ends in a %
its considered a percentage of the row count. For eample 33%
would fail the check if more then rowCount * 0.33
of the rows fail the check.
Currently supported validators are listed below:
Takes 2 parameters, the column name and a value
. The check will fail if max(column)
is not equal to the value.
Arg | Type | Description |
---|---|---|
column |
String | Column within table to find the max from. |
value |
* | The column max should equal this value or the check will fail. Note: The type of the value should match the type of the column. If the column is a NumericType , the value cannot be a String . |
Takes a single parameter, the column name to check. The validator will fail if any rows with that column are negative.
Arg | Type | Description |
---|---|---|
column |
String | Table column to be checked for negative values. If it contains a null validator will fail. Note: Column must be of a NumericType or the check will fail during the config check. |
threshold |
String | See above description of threshold. |
Takes a single parameter, the column name to check. The validator will fail if any rows with that column are null
.
Arg | Type | Description |
---|---|---|
column |
String | Table column to be checked for null . If it contains a null validator will fail. |
threshold |
String | See above description of threshold. |
Takes 2 - 4 parameters, described below. If the value in the column doesn't fall within the range specified by (minValue
, maxValue
) the check will fail.
Arg | Type | Description |
---|---|---|
column |
String | Table column to be checked. |
minValue |
* | lower bound of the range, or other column in table. Type depends on the type of the column . |
maxValue |
* | upper bound of the range, or other column in table. Type depends on the type of the column . |
inclusive |
Boolean | Include minValue and maxValue as part of the range. |
threshold |
String | See above description of threshold. |
Note: To specify another column in the table, you must prefix the column name with a ` (backtick).
Takes 2 to 4 parameters, described in the table below. If the length of the string in the column doesn't fall within the range specified by (minLength
, maxLength
), both inclusive, the check will fail.
At least one of minLength
or maxLength
must be specified. The data type of column
must be String.
Arg | Type | Description |
---|---|---|
column |
String | Table column to be checked. The DataType of the column must be a String |
minLength |
Integer | Lower bound of the length of the string, inclusive. |
maxLength |
Integer | Upper bound of the length of the string, inclusive. |
threshold |
String | See above description of threshold. |
Takes 2 to 3 parameters, described in the table below. If the column
value does not match the pattern specified by the regex
, the check will fail.
A value for regex
must be specified. The data type of column
must be String.
Arg | Type | Description |
---|---|---|
column |
String | Table column to be checked. The DataType of the column must be a String |
regex |
String | POSIX regex. |
threshold |
String | See above description of threshold. |
The minimum number of rows a table must have to pass the validator.
Arg | Type | Description |
---|---|---|
minNumRows |
Long | The minimum number of rows a table must have to pass. |
See Example Config file below to see how the checks are configured.
This check is used to make sure all rows in the table are unique, only the columns specified are used to determine uniqueness. This is a costly check and requires an additional pass through the table.
Arg | Type | Description |
---|---|---|
columns |
Array[String] | Each set of values in these columns must be unique. |
This check sums a column in all rows. If the sum applied to the column
doesn't fall within the range specified by (minValue
, maxValue
) the check will fail.
Arg | Type | Description |
---|---|---|
column |
String | The column to be checked. |
minValue |
NumericType | The lower bound of the sum. Type depends on the type of the column . |
maxValue |
NumericType | The upper bound of the sum. Type depends on the type of the column . |
inclusive |
Boolean | Include minValue and maxValue as part of the range. |
Note: If bounds are non-inclusive, and the actual sum is equal to one of the bounds, the relative error percentage will be undefined.
This check generates column statistics about the specified column.
Arg | Type | Description |
---|---|---|
column |
String | The column on which to collect statistics. |
These keys and their corresponding values will appear in the check's JSON summary when using the JSON report output mode:
Key | Type | Description |
---|---|---|
count |
Integer | Count of non-null entries in the column . |
mean |
Double | Mean/Average of the values in the column . |
min |
Double | Smallest value in the column . |
max |
Double | Largest value in the column . |
stdDev |
Double | Standard deviation of the values in the column . |
histogram |
Complex | Summary of an equi-width histogram, counts of values appearing in 10 equally sized buckets over the range [min, max] . |
---
# If keyColumns are not specified for a table, we take the first N columns of a table instead.
numKeyCols: 2
# numErrorsToReport: Number of errors per check show in "Error Details" of report, this is to limit the size of the email.
numErrorsToReport: 5
# detailedErrors: If true, a second pass will be made for checks that fail to gather numErrorsToReport examples with offending value and keyColumns to aide in debugging
detailedErrors: true
vars:
- name: ENV
value: prod
- name: JAVA_DIR
env: JAVA_HOME
- name: TODAY
shell: date + "%Y-%m-%d"
- name: MAX_AGE
sql: SELECT max(age) FROM census_income.adult
outputs:
- filename: /user/home/sample.json
append: true
- pipe: /path/to/program
ignoreError: true
email:
smtpHost: smtp.example.com
subject: Data Validation Summary
from: data-validator-no-reply@example.com
to:
- person1@example.com
cc:
- person2@example.com, person3@example.com
bcc:
- person4@example.com
tables:
- db: census_income
table: adult
# Key Columns are used when errors occur to identify a row, so they should include enough columns to uniquely identify a row.
keyColumns:
- age
- occupation
condition: educationNum >= 5
checks:
# rowCount - checks if the number of rows is at least minRows
- type: rowCount
minNumRows: 50000
# negativeCheck - checks if any values are less than 0
- type: negativeCheck
column: age
# colstats - adds basic statistics of the column to the output
- type: colstats
column: age
# nullCheck - checks if the column is null, counts number of rows with null for this column.
- type: nullCheck
column: occupation
# stringLengthCheck - checks if the length of the string in the column falls within the specified range, counts number of rows in which the length of the string is outside the specified range.
- type: stringLengthCheck
column: occupation
minLength: 1
maxLength: 5
# stringRegexCheck - checks if the string in the column matches the pattern specified by `regex`, counts number of rows in which there is a mismatch.
- type: stringRegexCheck
column: occupation
regex: ^ENGINEER$ # matches the word ENGINEER
- type: stringRegexCheck
column: occupation
regex: \w # matches any alphanumeric string
The data-validator can be used in an oozie workflow to halt the wf if a check doesn't pass. There are 2 ways to use the data-validator in oozie and each has their own drawbacks. The selection of the methods is determined by the --exitErrorOnFail {true|false}
command line option.
The first option, enabled by --exitErrorOnFail=true
, is to have the data-validator exit with a non-zero value when a check fails. This enables the workflow to decide how it wants to handle a failed check/error. The downsides of this method, is that you can never be sure if the data-validator exited with an error because bad check, or if there was a problem with the execution of the data-validator. This also pollutes the oozie workflow info with ERROR
, which some might not like. This is currently the default but likely to change with v1.0.0
.
Example oozie wf snippet:
<action name="RunDataValidator">
<shell xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<exec>spark-submit</exec>
<argument>--conf</argument>
<argument>spark.yarn.maxAppAttempts=1</argument>
<argument>--class</argument>
<argument>com.target.data_validator.Main</argument>
<argument>--master</argument>
<argument>yarn</argument>
<argument>--deploy-mode</argument>
<argument>cluster</argument>
<argument>--keytab</argument>
<argument>${keytab}</argument>
<argument>--principal</argument>
<argument>${principal}</argument>
<argument>--files</argument>
<argument>config.yaml</argument>
<argument>data-validator-assembly-0.14.1.jar</argument>
<argument>--config</argument>
<argument>config.yaml</argument>
<argument>--exitErrorOnFail</argument>
<argument>true</argument>
<argument>--vars</argument>
<argument>ENV=${ENV},EMAIL_REPORT=${EMAIL_REPORT},SMTP_HOST=${SMTP_HOST}</argument>
<capture-output/>
</shell>
<ok to="ValidatorSuccess" />
<error to="ValidatorErrorOrCheckFail" />
</action>
<action name="ValidatorErrorOrCheckFail">
<!-- Check or data-validator failed -->
</action>
<action name="ValidatorSuccess">
<!-- Everything is wonderful! -->
</action>
The second option, enabled by --exitErrorOnFail=false
, is to have the data-validator output to stdout DATA_VALIDATOR_STATUS=PASS
or DATA_VALIDATOR_STATUS=FAIL
and System.exit(0)
when it completes. This enables the workflow to distinguish between a failed check, and a runtime error.
The downside is that you must use the oozie shell action, with the capture output option, and run the validator via Spark's client mode. This will likely become the default behavior in v1.0.0
.
Example oozie wf snippet:
<action name="RunDataValidator">
<shell xmlns="uri:oozie:shell-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<exec>spark-submit</exec>
<argument>--conf</argument>
<argument>spark.yarn.maxAppAttempts=1</argument>
<argument>--class</argument>
<argument>com.target.data_validator.Main</argument>
<argument>--master</argument>
<argument>yarn</argument>
<argument>--deploy-mode</argument>
<argument>client</argument>
<argument>--keytab</argument>
<argument>${keytab}</argument>
<argument>--principal</argument>
<argument>${principal}</argument>
<argument>data-validator-assembly-0.14.1.jar</argument>
<argument>--config</argument>
<argument>config.yaml</argument>
<argument>--exitErrorOnFail</argument>
<argument>false</argument>
<argument>--vars</argument>
<argument>ENV=${ENV},EMAIL_REPORT=${EMAIL_REPORT},SMTP_HOST=${SMTP_HOST}</argument>
<capture-output/>
</shell>
<ok to="ValidatorDecision" />
<error to="VaildatorError" />
</action>
<decision name="ValidatorDecision">
<switch>
<case to="ValidatorCheckFail">${wf:actionData('RunDataValidator')['DATA_VALIDATOR_STATUS'] eq "FAIL"}</case>
<case to="ValidatorCheckPass">${wf:actionData('RunDataValidator')['DATA_VALIDATOR_STATUS'] eq "PASS"}</case>
<default to="ValidatorNeither"/>
</switch>
</decision>
<action name="ValidatorCheckFail">
<!-- Handle Failed Check -->
</action>
<action name="ValidatorCheckPass">
<!-- Everything is Wonderful! -->
</action>
<action name="ValidatorFailure">
<!-- Notify devs of validator failure -->
</action>
com.target.data_validator.ConfigParser
has an entrypoint that will check that the configuration file
is parseable. It does not validate variable substitutions since those have runtime implications.
spark-submit \
--class com.target.data_validator.ConfigParser \
--files config.yml \
data-validator-assembly-0.14.1.jar \
config.yml
If there is an error, DV will print a message and exit non-zero.
Data Validator includes a tool to generate a sample .orc
file for use in local development.
This repo's SBT configuration wraps the tool in a convenient SBT task: sbt generateTestData
If you run this program or task, it will generate a file testData.orc
in the current directory.
You can then use the following config file to test the data-validator
.
It will generate a report.json
and report.html
.
spark-submit \
--master "local[*]" \
data-validator-assembly-0.14.1.jar \
--config local_validators.yaml \
--jsonReport report.json \
--htmlReport report.html
---
numKeyCols: 2
numErrorsToReport: 5
detailedErrors: true
tables:
- orcFile: testData.orc
checks:
- type: rowCount
minNumRows: 1000
- type: nullCheck
column: nullCol
This tool is based on methods described in Methodology for Data Validation 1.0 by Di Zio et al., published by Esset Validat Foundation in 2016. You can download the paper here.