Skip to content

Commit

Permalink
Refactoring of ETL/DbEntity into ETL/DbModel (#138)
Browse files Browse the repository at this point in the history
* Added tests for DbEntity\Table
* Moved namespace ETL\DbEntity to ETL\DbModel
* Refactoring of ETL/DbModel to remove old code and support upcoming features
* Updated commit for xdmod-test-artifacts
* Fix record formula verification and saving of overseer restriction value
* Add @plessbd blacklist filters
* Ignore case where temp directory already exists for tests
  • Loading branch information
smgallo authored May 15, 2017
1 parent 6fc2c89 commit 58b2bb8
Show file tree
Hide file tree
Showing 37 changed files with 3,715 additions and 3,473 deletions.
2 changes: 1 addition & 1 deletion classes/ETL/Aggregator/aAggregator.php
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ public function execute(EtlOverseerOptions $etlOverseerOptions)
// The aggregation unit must be set for the AggregationTable

foreach ( $this->etlDestinationTableList as $etlTableKey => $etlTable ) {
$etlTable->setAggregationUnit($aggregationUnit);
$etlTable->aggregation_unit = $aggregationUnit;
}

$this->variableMap['AGGREGATION_UNIT'] = $aggregationUnit;
Expand Down
64 changes: 32 additions & 32 deletions classes/ETL/Aggregator/pdoAggregator.php
Original file line number Diff line number Diff line change
Expand Up @@ -62,16 +62,16 @@
use ETL\aOptions;
use ETL\EtlOverseerOptions;
use ETL\DataEndpoint\Mysql;
use ETL\DbEntity\AggregationTable;
use ETL\DbEntity\Query;
use ETL\DbEntity\Table;
use ETL\DbModel\AggregationTable;
use ETL\DbModel\Query;
use ETL\DbModel\Table;
use ETL\Utilities;
use ETL\Configuration\EtlConfiguration;

use \Log;
use \PDOException;
use Log;
use PDOException;
use PDOStatement;
use \PDO;
use PDO;

class pdoAggregator extends aAggregator
{
Expand Down Expand Up @@ -106,8 +106,7 @@ class pdoAggregator extends aAggregator
*
* @param IngestorOptions $options Options specific to this Ingestor
* @param EtlConfiguration $etlConfig Parsed configuration options for this ETL
* @param string $defaultTablePrefix Default table prefix as defined in the child class (e.g.,
* "jobfact_by_")
* @param Log $logger PEAR Log object for system logging
* ------------------------------------------------------------------------------------------
*/

Expand Down Expand Up @@ -208,14 +207,14 @@ public function initialize(EtlOverseerOptions $etlOverseerOptions = null)
// but it doesn't matter because the naming will still be consistent.

$columnNames = $this->etlDestinationTable->getColumnNames();
$missingColumnNames = array_diff($this->etlSourceQuery->getGroupBys(), $columnNames);
$missingColumnNames = array_diff($this->etlSourceQuery->groupby, $columnNames);

if ( 0 != count($missingColumnNames) ) {
$msg = "Columns in group by not found in table: " . implode(", ", $missingColumnNames);
$this->logAndThrowException($msg);
}

$missingColumnNames = array_diff(array_keys($this->etlSourceQuery->getRecords()), $columnNames);
$missingColumnNames = array_diff(array_keys($this->etlSourceQuery->records), $columnNames);

if ( 0 != count($missingColumnNames) ) {
$msg = "Columns in formulas not found in table: " . implode(", ", $missingColumnNames);
Expand Down Expand Up @@ -271,18 +270,18 @@ protected function createDestinationTableObjects()
$this->destinationEndpoint->getSystemQuoteChar(),
$this->logger
);
$this->etlDestinationTable->setSchema($this->destinationEndpoint->getSchema());
$this->etlDestinationTable->schema = $this->destinationEndpoint->getSchema();

if ( isset($this->options->table_prefix) &&
$this->options->table_prefix != $this->etlDestinationTable->getTablePrefix() )
$this->options->table_prefix != $this->etlDestinationTable->table_prefix )
{
$msg =
"Overriding table prefix from " .
$this->etlDestinationTable->getTablePrefix()
$this->etlDestinationTable->table_prefix
. " to " .
$this->options->table_prefix;
$this->logger->debug($msg);
$this->etlDestinationTable->setTablePrefix($this->options->table_prefix);
$this->etlDestinationTable->table_prefix = $this->options->table_prefix;
}

// Aggregation does not support multiple destination tables but we must still populate
Expand All @@ -302,7 +301,7 @@ protected function performPreExecuteTasks()
{
// To support programmatic manipulation of the source Query object, save off the description
// of the first join (from) table
$sourceJoins = $this->etlSourceQuery->getJoins();
$sourceJoins = $this->etlSourceQuery->joins;
$this->etlSourceQueryOrigFromTable = array_shift($sourceJoins);
$this->etlSourceQueryModified = false;

Expand Down Expand Up @@ -355,7 +354,7 @@ protected function performPreAggregationUnitTasks($aggregationUnit)

$this->manageTable($substitutedEtlAggregationTable, $this->destinationEndpoint);

if ( $this->options->disable_keys && "myisam" == strtolower($etlTable->getEngine()) ) {
if ( $this->options->disable_keys && "myisam" == strtolower($etlTable->engine) ) {
$this->logger->info("Disable keys on $qualifiedDestTableName");
$sqlList[] = "ALTER TABLE $qualifiedDestTableName DISABLE KEYS";
}
Expand Down Expand Up @@ -384,7 +383,7 @@ protected function performPostAggregationUnitTasks($aggregationUnit, $numAggrega
$sqlList[] = "OPTIMIZE TABLE $qualifiedDestTableName";
}

if ( $this->options->disable_keys && "myisam" == strtolower($etlTable->getEngine()) ) {
if ( $this->options->disable_keys && "myisam" == strtolower($etlTable->engine) ) {
$sqlList[] = "ALTER TABLE $qualifiedDestTableName ENABLE KEYS";
}
}
Expand Down Expand Up @@ -463,7 +462,7 @@ protected function getDirtyAggregationPeriods($aggregationUnit)

$firstTable = $this->etlSourceQueryOrigFromTable;

$tableName = $this->sourceEndpoint->quoteSystemIdentifier($firstTable->getName());
$tableName = $this->sourceEndpoint->quoteSystemIdentifier($firstTable->name);

$aggregationPeriodQueryOptions = ( isset($this->parsedDefinitionFile->aggregation_period_query)
? $this->parsedDefinitionFile->aggregation_period_query
Expand Down Expand Up @@ -511,13 +510,14 @@ protected function getDirtyAggregationPeriods($aggregationUnit)
);

$this->logger->debug("Discover table $fromTable");
$firstTableDef = Table::discover($fromTable, $this->sourceEndpoint, null, $this->logger);

$firstTableDef = new Table(null, null, $this->logger);

// If we are in dryrun mode the table may not have been created yet but we still want to
// be able to display the generated queries so simply set the start and end day id
// fields.

if ( false === $firstTableDef ) {
if ( false === $firstTableDef->discover($fromTable, $this->sourceEndpoint) ) {
if ( $this->getEtlOverseerOptions()->isDryrun() ) {
$startDayIdField = "start_day_id";
$endDayIdField = "end_day_id";
Expand Down Expand Up @@ -641,7 +641,7 @@ protected function getDirtyAggregationPeriods($aggregationUnit)
),
'joins' => array(
(object) array(
'name' => $firstTable->getName(),
'name' => $firstTable->name,
'schema' => $this->sourceEndpoint->getSchema()
)
)
Expand All @@ -658,7 +658,7 @@ protected function getDirtyAggregationPeriods($aggregationUnit)
$recordRangeQuery = new Query($query, $this->sourceEndpoint->getSystemQuoteChar());
$this->getEtlOverseerOptions()->applyOverseerRestrictions($recordRangeQuery, $this->utilityEndpoint, $this);

$minMaxJoin = "( " . $recordRangeQuery->getSelectSql() . " ) record_ranges";
$minMaxJoin = "( " . $recordRangeQuery->getSql() . " ) record_ranges";
$dateRangeRestrictionSql = "d.id BETWEEN record_ranges.start_period_id AND record_ranges.end_period_id";

} // else ( $this->getEtlOverseerOptions()->isForce() )
Expand Down Expand Up @@ -767,11 +767,11 @@ protected function _execute($aggregationUnit)
// Remove the first join (from) and replace it with the temporary table that we are
// going to create

$sourceJoins = $this->etlSourceQuery->getJoins();
$sourceJoins = $this->etlSourceQuery->joins;
$firstJoin = array_shift($sourceJoins);
$newFirstJoin = clone $firstJoin;
$newFirstJoin->setName($tmpTableName);
$newFirstJoin->setSchema($this->sourceEndpoint->getSchema());
$newFirstJoin->schema = $this->sourceEndpoint->getSchema();

$this->etlSourceQuery->deleteJoins();
$this->etlSourceQuery->addJoin($newFirstJoin);
Expand All @@ -786,7 +786,7 @@ protected function _execute($aggregationUnit)

// We are not optimizing but have previously, restore the original FROM clause

$sourceJoins = $this->etlSourceQuery->getJoins();
$sourceJoins = $this->etlSourceQuery->joins;
array_shift($sourceJoins);
$this->etlSourceQuery->deleteJoins();
$this->etlSourceQuery->addJoin($this->etlSourceQueryOrigFromTable);
Expand Down Expand Up @@ -889,7 +889,7 @@ protected function _execute($aggregationUnit)
$aggregationPeriodListOffset = 0;
$done = false;

$sourceJoins = $this->etlSourceQuery->getJoins();
$sourceJoins = $this->etlSourceQuery->joins;
$firstJoin = current($sourceJoins);
$tmpTableAlias = $firstJoin->getAlias();

Expand Down Expand Up @@ -940,7 +940,7 @@ protected function _execute($aggregationUnit)
$origTableName =
$this->sourceEndpoint->getSchema(true)
. "."
. $this->sourceEndpoint->quoteSystemIdentifier($this->etlSourceQueryOrigFromTable->getName());
. $this->sourceEndpoint->quoteSystemIdentifier($this->etlSourceQueryOrigFromTable->name);

try {
// Use the where clause from the aggregation query to create the temporary table
Expand Down Expand Up @@ -1280,7 +1280,7 @@ protected function buildSqlStatements($aggregationUnit, $includeSchema = true)

// *** Should this functionality be included in the Query itself? ***

$sourceRecords = $this->etlSourceQuery->getRecords();
$sourceRecords = $this->etlSourceQuery->records;

$substitutedRecordNames = array();
$duplicateRecords = array();
Expand All @@ -1307,18 +1307,18 @@ protected function buildSqlStatements($aggregationUnit, $includeSchema = true)
}
}

$this->selectSql = $this->etlSourceQuery->getSelectSql($includeSchema);
$this->selectSql = $this->etlSourceQuery->getSql($includeSchema);

$this->insertSql = "INSERT INTO " . $this->etlDestinationTable->getFullName($includeSchema) . "\n" .
"("
. implode(",\n", array_keys($this->etlSourceQuery->getRecords()))
. implode(",\n", array_keys($this->etlSourceQuery->records))
. ")\nVALUES\n("
. implode(",\n", Utilities::createPdoBindVarsFromArrayKeys($this->etlSourceQuery->getRecords()))
. implode(",\n", Utilities::createPdoBindVarsFromArrayKeys($this->etlSourceQuery->records))
. ")";

$this->optimizedInsertSql = "INSERT INTO " . $this->etlDestinationTable->getFullName($includeSchema) . "\n" .
"(" .
implode(",\n", array_keys($this->etlSourceQuery->getRecords()))
implode(",\n", array_keys($this->etlSourceQuery->records))
. ")\n" .
$this->selectSql;

Expand Down
Loading

0 comments on commit 58b2bb8

Please sign in to comment.