Skip to content

Commit

Permalink
Generate ETLv2 table definitions
Browse files Browse the repository at this point in the history
  • Loading branch information
jpwhite4 committed Dec 6, 2018
1 parent 8bd8d0b commit f402bde
Show file tree
Hide file tree
Showing 3 changed files with 219 additions and 109 deletions.
12 changes: 1 addition & 11 deletions etl/js/lib/etl_profile.js
Original file line number Diff line number Diff line change
Expand Up @@ -304,18 +304,8 @@ ETLProfile.prototype.createOutputTables = function (outputmode) {
//todo: create a table for etl profiles
//todo: create a table to hold a list of all datasets across all etl profiles
var poolConnectionMax = 2;
var tables = this.getTables();

var allStatements = [];
//create the dynamic tables
for (var t in tables) {
var table = tables[t];
self.emit('message', 'Processing table: ' + table.name);
var createStatement = table.getCreateTableStatement();
allStatements = allStatements.concat(createStatement);
var createErrorTableStatement = table.getCreateErrorTableStatement();
allStatements.push(createErrorTableStatement);
}
// create/populate dimension tables
for (var t in this.schema.dimension_tables) {
var table_defn = this.schema.dimension_tables[t];
Expand Down Expand Up @@ -449,7 +439,7 @@ ETLProfile.prototype.aggregate = function () {
var self = this;
try {
if (this.output.dbEngine === 'mysqldb') {
etlv2.generateAggregates(this, config.xdmodBuildConfigDir);
etlv2.generateDefinitionFiles(this, config.xdmodBuildConfigDir);
} else {
//other db engines not yet supported. , this class can be specialized
//or modularized at that time.
Expand Down
105 changes: 104 additions & 1 deletion etl/js/lib/etlv2.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,87 @@ var generateJobListTableIdentifier = function (table) {


module.exports = {

/**
* Create an ETLv2 table definition for the provided dynamic table.
*
* @param table the DynamicTable to process.
* @param isErrorTable whether to create the definition of the associated error table
* or the fact table.
*
* @return [Object] the ETLv2 table definition.
*/
createDynamicTableDefinition: function (table, isErrorTable) {
var tableDefinition = {
name: table.name,
engine: 'MyISAM',
comment: '',
columns: []
};
var tableColumns = table.getDynamicTableFields(isErrorTable);

if (isErrorTable) {
tableDefinition.name += '_errors';
}

for (let i = 0; i < tableColumns.length; i++) {
let definition = {
name: tableColumns[i].name,
type: sqlType(tableColumns[i].type, tableColumns[i].length),
nullable: tableColumns[i].nullable,
comment: tableColumns[i].comments
};
if (tableColumns[i].hasOwnProperty('def')) {
definition.default = tableColumns[i].def;
}
if (tableColumns[i].hasOwnProperty('extra')) {
definition.extra = tableColumns[i].extra;
}
tableDefinition.columns.push(definition);
}

tableDefinition.indexes = [{
name: 'PRIMARY',
columns: [
tableDefinition.columns[0].name
],
type: 'BTREE',
is_unique: true
}];

if (!isErrorTable) {
if (table.meta.unique) {
tableDefinition.indexes.push({
name: 'pk_index',
columns: table.meta.unique,
type: 'BTREE',
is_unique: true
});
}
}

for (let i = 0; i < table.meta.extras.length; i++) {
if (typeof table.meta.extras[i] === 'string') {
// text string definitions are deprecated. Support only one
// type of definition for backwards compatibility
let match = /^KEY ([^\s]+) \(([^)]+)\)$/.exec(table.meta.extras[i]);
if (!match) {
throw new Error('Unsupported table metadata definition ' + table.meta.extras[i]);
}
tableDefinition.indexes.push({
name: match[1],
columns: match[2].split(','),
type: 'BTREE',
is_unique: false
});
} else {
tableDefinition.indexes.push(table.meta.extras[i]);
}
}

return tableDefinition;
},

/**
* Generate the configuration files for the aggregation tables. This will
* generate two table definitions and two action files. One for the days
Expand Down Expand Up @@ -298,7 +379,15 @@ module.exports = {
return action;
},

generateAggregates: function (profile, xdmodConfigDirectory) {
/**
* generate the etlv2 table definitions and action files for all supported tables
* in a given etl profile and write them to the specified path.
*
* @param profile The etl profile object
* @param xdmodConfigDirectory the path to the XDMoD configuration directory under
* which the config files will be written.
*/
generateDefinitionFiles: function (profile, xdmodConfigDirectory) {
var etlv2ConfigDir = xdmodConfigDirectory + '/etl';
var tables = profile.getAggregationTables();

Expand All @@ -324,5 +413,19 @@ module.exports = {
mkdirAndWrite(tableDefnDir, generateJobListTableIdentifier(table), joblistTableDefn);
}
}

var facttables = profile.getTables();

for (let t in facttables) {
if (facttables.hasOwnProperty(t)) {
let table = facttables[t];
let tableDefnDir = etlv2ConfigDir + '/etl_tables.d/' + table.meta.schema.substring(5);

[true, false].forEach(function (isErrorTable) {
let tableDefn = module.exports.createDynamicTableDefinition(table, isErrorTable);
mkdirAndWrite(tableDefnDir, tableDefn.name, tableDefn);
});
}
}
}
};
211 changes: 114 additions & 97 deletions etl/js/lib/mysql.js
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,7 @@
*
*/

var mysql = require('mysql'),
util = require('util');
var util = require('util');

function dynamicSort(property) {
return function (obj1,obj2) {
Expand Down Expand Up @@ -72,6 +71,117 @@ var DynamicTable = module.exports.DynamicTable = function(table) {
return stmt;
};

/**
* Return the column descriptors in order. Required
* dimensions first then optional dimensions then the metrics.
*/
this.getColumns = function () {
var requiredDimensions = [];
var optionalDimensions = [];
var metrics = [];

var columnNames = Object.keys(this.columns).sort();
for (let i = 0; i < columnNames.length; i++) {
let colname = columnNames[i];
if (this.columns.hasOwnProperty(colname)) {
let column = this.columns[colname];

column.name = colname;

if (column.nullable === false) {
if (column.def === null) {
column.dimension_type = 'required';
requiredDimensions.push(column);
} else {
column.dimension_type = 'optional';
optionalDimensions.push(column);
}
} else {
column.dimension_type = 'metric';
metrics.push(column);
}
}
}
return requiredDimensions.concat(optionalDimensions, metrics);
};

/**
* return the dynamic table fields.
* @param isErrorTable whether to return the structure of the error table (true) or the main table (false)
*/
this.getDynamicTableFields = function (isErrorTable) {
if (isErrorTable) {
return this.getDynamicErrorTableFields();
}
return this.getDynamicFactTableFields();
};

/**
* return the dynamic table fields for the fact table
*/
this.getDynamicFactTableFields = function () {
var tableFields = [{
name: '_id',
type: 'int32',
nullable: false,
extra: 'auto_increment'
}];

tableFields.push(...this.getColumns());

tableFields.push(...[{
name: '_version',
type: 'int32',
nullable: false
}, {
name: 'last_modified',
type: 'timestamp',
nullable: false,
def: 'CURRENT_TIMESTAMP',
extra: 'ON UPDATE CURRENT_TIMESTAMP'
}]);

return tableFields;
};

/**
* return the dynamic table fields for the error table
*/
this.getDynamicErrorTableFields = function () {
var tableFields = [{
name: '_id',
type: 'int32',
nullable: false
}];

var columns = this.getColumns();
for (let i = 0; i < columns.length; i++) {
if (columns[i].dimension_type === 'required') {
tableFields.push({
name: columns[i].name,
type: columns[i].type,
nullable: columns[i].nullable,
comments: 'DIMENSION VALUE'
});
} else {
tableFields.push({
name: columns[i].name,
type: 'int32',
nullable: true,
comments: 'ERROR CODE'
});
}
}

tableFields.push({
name: '_version',
type: 'int32',
nullable: false
});

return tableFields;
};

/**
* return an array of arrays containing the colunm type, name, description and units
*/
Expand Down Expand Up @@ -107,67 +217,6 @@ var DynamicTable = module.exports.DynamicTable = function(table) {
return reqDims.concat(restDims).concat(metrics);
};

this.getCreateTableStatement = function() {
var reqDims = [],
restDims = [],
metrics = [],
colKeys = Object.keys(this.columns).sort();
for(var col in colKeys) {
var column = this.columns[colKeys[col]];
column.name = colKeys[col];
column.sqlType = sqlType(column.type, column.length);

var comments = 'COMMENT ' + mysql.escape(column.comments?column.comments:'');
if(column.nullable === false ) {
if(column.def === null) {
reqDims.push(column.name + ' ' + column.sqlType + ' NOT NULL ' + comments);
} else {
restDims.push(column.name + ' ' + column.sqlType + ' NOT NULL DEFAULT \'' + column.def + '\' ' + comments);
}
} else{
metrics.push(column.name + ' ' + column.sqlType + ' DEFAULT ' + (column.def === null? 'NULL':'\'' + column.def + '\'') + ' ' + comments);
}
}

var allDims = [
' _id INT NOT NULL AUTO_INCREMENT',
reqDims.join(',\n '),
restDims.join(',\n '),
metrics.join(',\n '),
'_version INT NOT NULL',
'`last_modified` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP',
'UNIQUE KEY pk_index (' + this.meta.unique.join(',') + ')',
this.extras.join(',\n '),
'PRIMARY KEY (_id)'
];

var ret = ['CREATE TABLE IF NOT EXISTS ' + this.name + '(\n' + allDims.join(',\n ') + '\n) engine = myisam'];

if(this.triggers) {
var verbs = { before: "BEFORE", after: "AFTER" };
var nouns = { insert: "INSERT", update: "UPDATE", del: "DELETE" };

for(var verb in verbs) {
for(var noun in nouns) {
var action = verb + "_" + noun;
if(this.triggers.hasOwnProperty(action)) {
var triggername = "`"+this.meta.schema+ "`.`" + this.name + verb + noun + "`";
var stmt = "DELIMITER $$\n";
stmt += "DROP TRIGGER IF EXISTS " + triggername + "$$\n";
stmt += "USE `"+this.meta.schema+"`$$\n";
stmt += "CREATE TRIGGER " + triggername + "\n";
stmt += verbs[verb] + ' ' + nouns[noun] + ' ON `' + this.name + '`\n';
stmt += "FOR EACH ROW\nBEGIN\n";
stmt += this.triggers[action];
stmt += "END$$\nDELIMITER ;\n";
ret.push(stmt);
}
}
}
}

return ret;
},
this.getAggregationTableFields = function() {
var ret = [],
colKeys = Object.keys(this.columns).sort();
Expand Down Expand Up @@ -197,40 +246,6 @@ var DynamicTable = module.exports.DynamicTable = function(table) {
+ ' into ' + this.meta.schema + '.' + this.name + '_errors (_id,' + allEntries.join(',') + ',_version)'
+ ' values (:_id, :' + allEntries.join(',:') + ',' + _version + ')';
},
this.getCreateErrorTableStatement = function() {
var reqDims = [],
restDims = [],
metrics = [],
colKeys = Object.keys(this.columns).sort();
for(var col in colKeys) {
var column = this.columns[colKeys[col]];
column.sqlType = sqlType(column.type, column.length);

if(column.nullable === false) {
if(column.def === null) {
reqDims.push(colKeys[col] + ' ' + column.sqlType + ' NOT NULL COMMENT \'DIMENSION VALUE\'');
} else {
restDims.push(colKeys[col] + ' int DEFAULT NULL COMMENT \'ERROR CODE\'');
}
} else
{
metrics.push(colKeys[col] + ' int DEFAULT NULL COMMENT \'ERROR CODE\'');
}
}

var allDims = [
' _id INT NOT NULL',
reqDims.join(',\n '),
restDims.join(',\n '),
metrics.join(',\n '),
'_version INT NOT NULL'
];
allDims.push( this.meta.extras.join(',\n '), ' PRIMARY KEY (_id)'/*, 'KEY version_index (_version)'*/);

var ret = 'CREATE TABLE IF NOT EXISTS ' + this.name + '_errors (' + allDims.join(',\n ') + '\n) engine = myisam';

return ret;
}
}

module.exports.sqlType = function (type, length) {
Expand All @@ -245,6 +260,8 @@ module.exports.sqlType = function (type, length) {
return 'double';
case 'string':
return 'varchar(' + (length !== undefined ? length : 50) + ')';
case 'timestamp':
return 'timestamp';
case 'array':
throw Error('Type ' + type + ' should not be in a table as a column');
default:
Expand Down

0 comments on commit f402bde

Please sign in to comment.