Skip to content

Commit

Permalink
Merge pull request #42 from keboola/martin-snowflake-slices-limit
Browse files Browse the repository at this point in the history
Snowflake slices limit
  • Loading branch information
Halama authored May 3, 2018
2 parents 1458238 + b1114df commit 76d3a38
Show file tree
Hide file tree
Showing 8 changed files with 152 additions and 79 deletions.
4 changes: 2 additions & 2 deletions src/Snowflake/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,10 @@ public function __construct(array $options)
if (stristr($e->getMessage(), "S1000") !== false) {
$attemptNumber++;
if ($attemptNumber > $maxBackoffAttempts) {
throw new Exception("Initializing Snowflake connection failed: " . $e->getMessage(), null, $e);
throw new Exception("Initializing Snowflake connection failed: " . $e->getMessage(), 0, $e);
}
} else {
throw new Exception("Initializing Snowflake connection failed: " . $e->getMessage(), null, $e);
throw new Exception("Initializing Snowflake connection failed: " . $e->getMessage(), 0, $e);
}
}
} while ($this->connection === null);
Expand Down
6 changes: 2 additions & 4 deletions src/Snowflake/CsvImport.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@ class CsvImport extends CsvImportBase
protected function importDataToStagingTable(string $stagingTableName, array $columns, array $sourceData): void
{
foreach ($sourceData as $csvFile) {
$this->importTable(
$this->importTableFromCsv(
$stagingTableName,
$csvFile,
[
'isManifest' => false,
]
false
);
}
}
Expand Down
176 changes: 111 additions & 65 deletions src/Snowflake/CsvImportBase.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

abstract class CsvImportBase extends ImportBase
{
private const SLICED_FILES_CHUNK_SIZE = 1000;

/** @var string */
protected $s3key;

Expand All @@ -33,14 +35,7 @@ public function __construct(
$this->s3region = $s3region;
}

/**
* @param string $tableName
* @param CsvFile $csvFile
* @param array $options
* - isManifest
* @throws Exception
*/
protected function importTable(string $tableName, CsvFile $csvFile, array $options): void
protected function importTableFromCsv(string $tableName, CsvFile $csvFile, bool $isSliced): void
{
if ($csvFile->getEnclosure() && $csvFile->getEscapedBy()) {
throw new Exception(
Expand All @@ -53,9 +48,10 @@ protected function importTable(string $tableName, CsvFile $csvFile, array $optio
try {
$timerName = 'copyToStaging-' . $csvFile->getBasename();
Debugger::timer($timerName);
$results = $this->connection->fetchAll($this->generateCopyCommand($tableName, $csvFile, $options));
foreach ($results as $result) {
$this->importedRowsCount += (int) $result['rows_loaded'];
if ($isSliced) {
$this->importTableFromSlicedFile($tableName, $csvFile);
} else {
$this->importTableFromSingleFile($tableName, $csvFile);
}
$this->addTimer($timerName, Debugger::timer($timerName));
} catch (\Throwable $e) {
Expand All @@ -67,20 +63,109 @@ protected function importTable(string $tableName, CsvFile $csvFile, array $optio
}
}

/**
* @param string $tableName
* @param CsvFile $csvFile
* @param array $options
* - isManifest
* @return string
*/
private function generateCopyCommand(string $tableName, CsvFile $csvFile, array $options): string
private function importTableFromSingleFile(string $stableName, CsvFile $csvFile): void
{
$csvOptions = $this->createCopyCommandCsvOptions(
$csvFile,
$this->getIgnoreLines()
);
$this->executeCopyCommand(
$this->generateSingleFileCopyCommand(
$stableName,
$csvFile->getPathname(),
$csvOptions
)
);
}

private function importTableFromSlicedFile(string $tableName, CsvFile $csvFile): void
{
$csvOptions = $this->createCopyCommandCsvOptions(
$csvFile,
$this->getIgnoreLines()
);
$parsedS3Path = parse_url($csvFile->getPathname());

$slicesPaths = $this->getFilesToDownloadFromManifest(
$parsedS3Path['host'],
$parsedS3Path['path']
);
foreach (array_chunk($slicesPaths, self::SLICED_FILES_CHUNK_SIZE) as $slicesChunk) {
$this->executeCopyCommand(
$this->generateSlicedFileCopyCommand(
$tableName,
$parsedS3Path['host'],
$slicesChunk,
$csvOptions
)
);
}
}

private function executeCopyCommand(string $sql): void
{
$results = $this->connection->fetchAll($sql);
foreach ($results as $result) {
$this->importedRowsCount += (int) $result['rows_loaded'];
}
}

private function generateSingleFileCopyCommand(string $tableName, string $s3path, array $csvOptions): string
{
return sprintf(
"COPY INTO %s FROM %s
CREDENTIALS = (AWS_KEY_ID = %s AWS_SECRET_KEY = %s)
REGION = %s
FILE_FORMAT = (TYPE=CSV %s)",
$this->nameWithSchemaEscaped($tableName),
$this->quote($s3path),
$this->quote($this->s3key),
$this->quote($this->s3secret),
$this->quote($this->s3region),
implode(
' ',
$csvOptions
)
);
}

private function generateSlicedFileCopyCommand(string $tableName, string $s3Bucket, array $slicesPaths, array $csvOptions): string
{
$s3Prefix = sprintf('s3://%s', $s3Bucket);
return sprintf(
"COPY INTO %s FROM %s
CREDENTIALS = (AWS_KEY_ID = %s AWS_SECRET_KEY = %s)
REGION = %s
FILE_FORMAT = (TYPE=CSV %s)
FILES = (%s)",
$this->nameWithSchemaEscaped($tableName),
$this->quote($s3Prefix),
$this->quote($this->s3key),
$this->quote($this->s3secret),
$this->quote($this->s3region),
implode(
' ',
$csvOptions
),
implode(
', ',
array_map(
function ($file) use ($s3Prefix) {
return $this->quote(str_replace($s3Prefix . '/', '', $file));
},
$slicesPaths
)
)
);
}

private function createCopyCommandCsvOptions(CsvFile $csvFile, int $ignoreLinesCount): array
{
$csvOptions = [];
$csvOptions[] = sprintf('FIELD_DELIMITER = %s', $this->quote($csvFile->getDelimiter()));

if ($this->getIgnoreLines()) {
$csvOptions[] = sprintf('SKIP_HEADER = %d', $this->getIgnoreLines());
if ($ignoreLinesCount > 0) {
$csvOptions[] = sprintf('SKIP_HEADER = %d', $ignoreLinesCount);
}

if ($csvFile->getEnclosure()) {
Expand All @@ -90,53 +175,16 @@ private function generateCopyCommand(string $tableName, CsvFile $csvFile, array
$csvOptions[] = sprintf("ESCAPE_UNENCLOSED_FIELD = %s", $this->quote($csvFile->getEscapedBy()));
}

if (empty($options['isManifest'])) {
return sprintf(
"COPY INTO %s FROM %s
CREDENTIALS = (AWS_KEY_ID = %s AWS_SECRET_KEY = %s)
REGION = %s
FILE_FORMAT = (TYPE=CSV %s)",
$this->nameWithSchemaEscaped($tableName),
$this->quote($csvFile->getPathname()),
$this->quote($this->s3key),
$this->quote($this->s3secret),
$this->quote($this->s3region),
implode(' ', $csvOptions)
);
} else {
$parsedS3Path = parse_url($csvFile->getPathname());
$s3Prefix = 's3://' . $parsedS3Path['host'];
return sprintf(
"COPY INTO %s FROM %s
CREDENTIALS = (AWS_KEY_ID = %s AWS_SECRET_KEY = %s)
REGION = %s
FILE_FORMAT = (TYPE=CSV %s)
FILES = (%s)",
$this->nameWithSchemaEscaped($tableName),
$this->quote($s3Prefix), // s3 bucket
$this->quote($this->s3key),
$this->quote($this->s3secret),
$this->quote($this->s3region),
implode(' ', $csvOptions),
implode(
', ',
array_map(
function ($file) use ($s3Prefix) {
return $this->quote(str_replace($s3Prefix . '/', '', $file));
},
$this->getFilesToDownloadFromManifest($csvFile->getPathname())
)
)
);
}
return $csvOptions;
}


private function quote(string $value): string
{
return "'" . addslashes($value) . "'";
}

private function getFilesToDownloadFromManifest(string $path): array
private function getFilesToDownloadFromManifest(string $bucket, string $path): array
{
$s3Client = new \Aws\S3\S3Client([
'credentials' => [
Expand All @@ -147,12 +195,10 @@ private function getFilesToDownloadFromManifest(string $path): array
'version' => '2006-03-01',
]);

$path = parse_url($path);

try {
$response = $s3Client->getObject([
'Bucket' => $path['host'],
'Key' => ltrim($path['path'], '/'),
'Bucket' => $bucket,
'Key' => ltrim($path, '/'),
]);
} catch (AwsException $e) {
throw new Exception('Unable to download file from S3: ' . $e->getMessage());
Expand Down
6 changes: 2 additions & 4 deletions src/Snowflake/CsvManifestImport.php
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,10 @@ class CsvManifestImport extends CsvImportBase
protected function importDataToStagingTable(string $stagingTableName, array $columns, array $sourceData): void
{
foreach ($sourceData as $csvFile) {
$this->importTable(
$this->importTableFromCsv(
$stagingTableName,
$csvFile,
[
'isManifest' => true,
]
true
);
}
}
Expand Down
4 changes: 2 additions & 2 deletions tests/Snowflake/ConnectionTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ public function testConnectionWithoutDbAndWarehouse(): void
$connection = new Connection([
'host' => getenv('SNOWFLAKE_HOST'),
'port' => getenv('SNOWFLAKE_PORT'),
'user' => getenv('SNOWFLAKE_WAREHOUSE'),
'user' => getenv('SNOWFLAKE_USER'),
'password' => getenv('SNOWFLAKE_PASSWORD'),
]);

Expand Down Expand Up @@ -83,7 +83,7 @@ private function createConnection(): Connection
'port' => getenv('SNOWFLAKE_PORT'),
'database' => getenv('SNOWFLAKE_DATABASE'),
'warehouse' => getenv('SNOWFLAKE_WAREHOUSE'),
'user' => getenv('SNOWFLAKE_WAREHOUSE'),
'user' => getenv('SNOWFLAKE_USER'),
'password' => getenv('SNOWFLAKE_PASSWORD'),
]);
}
Expand Down
11 changes: 9 additions & 2 deletions tests/Snowflake/ImportTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -244,10 +244,17 @@ public function fullImportData(): array
$lemmaHeader = array_shift($expectedLemma);
$expectedLemma = array_values($expectedLemma);

// large sliced manifest
$expectedLargeSlicedManifest = [];
for ($i = 0; $i <= 1500; $i++) {
$expectedLargeSlicedManifest[] = ['a', 'b'];
}

$s3bucket = getenv(self::AWS_S3_BUCKET_ENV);

return [
// full imports
[[new CsvFile("s3://{$s3bucket}/manifests/2cols-large/sliced.csvmanifest")], $escapingHeader, $expectedLargeSlicedManifest, 'out.csv_2Cols', 'manifest' ],
[[new CsvFile("s3://{$s3bucket}/empty.manifest")], $escapingHeader, [], 'out.csv_2Cols', 'manifest' ],
[[new CsvFile("s3://{$s3bucket}/lemma.csv")], $lemmaHeader, $expectedLemma, 'out.lemma'],
[[new CsvFile("s3://{$s3bucket}/standard-with-enclosures.csv")], $escapingHeader, $expectedEscaping, 'out.csv_2Cols'],
Expand Down Expand Up @@ -287,8 +294,8 @@ public function fullImportData(): array
[new CsvFile("s3://{$s3bucket}/with-ts.csv")],
['col1', 'col2', '_timestamp'],
[
['a', 'b', 'Mon, 10 Nov 2014 13:12:06 Z'],
['c', 'd', 'Mon, 10 Nov 2014 14:12:06 Z'],
['a', 'b', '2014-11-10 13:12:06.000'],
['c', 'd', '2014-11-10 14:12:06.000'],
],
'out.csv_2Cols',
],
Expand Down
2 changes: 2 additions & 0 deletions tests/_data/csv-import/manifests/2cols-large/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
*
!.gitignore
22 changes: 22 additions & 0 deletions tests/loadS3.php
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,26 @@
]);
}

// generate files
$largeManifest = [
'entries' => [],
];
for ($i = 0; $i <= 1500; $i++) {
$sliceName = sprintf('sliced.csv_%d', $i);
file_put_contents(
$source . '/manifests/2cols-large/' . $sliceName,
"\"a\",\"b\"\n"
);
$largeManifest['entries'][] = [
'url' => sprintf("s3://%s/manifests/2cols-large/%s", $bucket, $sliceName),
'mandatory' => true,
];
}
file_put_contents(
$source . '/manifests/2cols-large/sliced.csvmanifest',
json_encode($largeManifest)
);

// Create a transfer object.
$manager = new \Aws\S3\Transfer($client, $source, $dest, [
'debug' => true,
Expand Down Expand Up @@ -108,6 +128,8 @@
],
];

// 4. More than 1000 slices

$client->putObject([
'Bucket' => $bucket,
'Key' => '02_tw_accounts.csv.invalid.manifest',
Expand Down

0 comments on commit 76d3a38

Please sign in to comment.