Skip to content

Commit 37c301f

Browse files
committed
use pcrov/jsonreader to stream json file
1 parent ec58afe commit 37c301f

File tree

9 files changed

+323
-30
lines changed

9 files changed

+323
-30
lines changed

Dockerfile

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
2424
debsig-verify \
2525
dirmngr \
2626
gpg-agent \
27+
libicu-dev \
2728
&& rm -r /var/lib/apt/lists/* \
2829
&& sed -i 's/^# *\(en_US.UTF-8\)/\1/' /etc/locale.gen \
2930
&& locale-gen \
@@ -64,6 +65,9 @@ RUN mkdir -p ~/.gnupg \
6465
&& gpg --batch --delete-key --yes $SNOWFLAKE_GPG_KEY \
6566
&& dpkg -i /tmp/snowflake-odbc.deb
6667

68+
RUN docker-php-ext-configure intl \
69+
&& docker-php-ext-install intl
70+
6771
## Composer - deps always cached unless changed
6872
# First copy only composer files
6973
COPY composer.* /code/

composer.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
"keboola/db-adapter-snowflake": "^1.0.0",
88
"keboola/php-component": "^7.0.1",
99
"microsoft/azure-storage-blob": "^1.4",
10+
"pcrov/jsonreader": "^1.0",
1011
"symfony/stopwatch": "^4.3"
1112
},
1213
"require-dev": {
@@ -32,6 +33,7 @@
3233
}
3334
},
3435
"scripts": {
36+
"testHuge": "php ./tests/testHugeManifest.php",
3537
"tests-unit": "phpunit tests/unit",
3638
"tests-functional": "phpunit tests/functional",
3739
"tests": [

src/Storage/ABS/SnowflakeImportAdapter.php

Lines changed: 46 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -64,30 +64,57 @@ public function getCopyCommands(
6464
ImportOptions $importOptions,
6565
string $stagingTableName
6666
): Generator {
67-
$filesToImport = $this->source->getManifestEntries();
68-
foreach (array_chunk($filesToImport, ImporterInterface::SLICED_FILES_CHUNK_SIZE) as $entries) {
69-
yield sprintf(
70-
'COPY INTO %s.%s
67+
$entriesInChunk = [];
68+
foreach ($this->source->getManifestEntries() as $entry) {
69+
$entriesInChunk[] = $entry;
70+
if (count($entriesInChunk) === ImporterInterface::SLICED_FILES_CHUNK_SIZE) {
71+
yield $this->getCopyCommand(
72+
$destination,
73+
$importOptions,
74+
$stagingTableName,
75+
$entriesInChunk
76+
);
77+
$entriesInChunk = [];
78+
}
79+
}
80+
yield $this->getCopyCommand(
81+
$destination,
82+
$importOptions,
83+
$stagingTableName,
84+
$entriesInChunk
85+
);
86+
}
87+
88+
/**
89+
* @param Table $destination
90+
*/
91+
private function getCopyCommand(
92+
DestinationInterface $destination,
93+
ImportOptions $importOptions,
94+
string $stagingTableName,
95+
array $entriesInChunk
96+
): string {
97+
return sprintf(
98+
'COPY INTO %s.%s
7199
FROM %s
72100
CREDENTIALS=(AZURE_SAS_TOKEN=\'%s\')
73101
FILE_FORMAT = (TYPE=CSV %s)
74102
FILES = (%s)',
75-
QueryBuilder::quoteIdentifier($destination->getSchema()),
76-
QueryBuilder::quoteIdentifier($stagingTableName),
77-
QueryBuilder::quote($this->source->getContainerUrl()),
78-
$this->source->getSasToken(),
79-
implode(' ', $this->getCsvCopyCommandOptions($importOptions, $this->source->getCsvOptions())),
80-
implode(
81-
', ',
82-
array_map(
83-
function ($entry) {
84-
return QueryBuilder::quote(strtr($entry, [$this->source->getContainerUrl() => '']));
85-
},
86-
$entries
87-
)
103+
QueryBuilder::quoteIdentifier($destination->getSchema()),
104+
QueryBuilder::quoteIdentifier($stagingTableName),
105+
QueryBuilder::quote($this->source->getContainerUrl()),
106+
$this->source->getSasToken(),
107+
implode(' ', $this->getCsvCopyCommandOptions($importOptions, $this->source->getCsvOptions())),
108+
implode(
109+
', ',
110+
array_map(
111+
function ($entry) {
112+
return QueryBuilder::quote(strtr($entry['url'], [$this->source->getContainerUrl() => '']));
113+
},
114+
$entriesInChunk
88115
)
89-
);
90-
}
116+
)
117+
);
91118
}
92119

93120
private function getCsvCopyCommandOptions(

src/Storage/ABS/SourceFile.php

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44

55
namespace Keboola\Db\ImportExport\Storage\ABS;
66

7+
use Generator;
78
use Keboola\Csv\CsvOptions;
89
use Keboola\Db\ImportExport\Backend\BackendImportAdapterInterface;
910
use Keboola\Db\ImportExport\Backend\ImporterInterface;
1011
use Keboola\Db\ImportExport\Backend\Snowflake\Importer as SnowflakeImporter;
12+
use Keboola\Db\ImportExport\Storage\ManifestReader;
1113
use Keboola\Db\ImportExport\Storage\NoBackendAdapterException;
1214
use Keboola\Db\ImportExport\Storage\SourceInterface;
1315
use MicrosoftAzure\Storage\Blob\BlobRestProxy;
@@ -54,10 +56,11 @@ public function getCsvOptions(): CsvOptions
5456
return $this->csvOptions;
5557
}
5658

57-
public function getManifestEntries(): array
59+
public function getManifestEntries(): Generator
5860
{
5961
if (!$this->isSliced) {
60-
return [$this->getContainerUrl() . $this->filePath];
62+
yield from [['url' => $this->getContainerUrl() . $this->filePath]];
63+
return;
6164
}
6265

6366
$SASConnectionString = sprintf(
@@ -74,9 +77,6 @@ public function getManifestEntries(): array
7477
);
7578

7679
$getResult = $blobClient->getBlob($this->container, $this->filePath);
77-
$manifest = \GuzzleHttp\json_decode(stream_get_contents($getResult->getContentStream()), true);
78-
return array_map(function ($entry) {
79-
return $entry['url'];
80-
}, $manifest['entries']);
80+
yield from ManifestReader::readEntries($getResult->getContentStream());
8181
}
8282
}

src/Storage/ManifestReader.php

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Keboola\Db\ImportExport\Storage;
6+
7+
use Generator;
8+
use pcrov\JsonReader\JsonReader;
9+
10+
class ManifestReader
11+
{
12+
/**
13+
* @param resource $stream
14+
*/
15+
public static function readEntries(
16+
$stream
17+
): Generator {
18+
$reader = new JsonReader();
19+
$reader->stream($stream);
20+
21+
$reader->read('entries');
22+
$depth = $reader->depth(); // Check in a moment to break when the array is done.
23+
24+
$reader->read(); // Step to the first element.
25+
do {
26+
yield $reader->value();
27+
} while ($reader->next() && $reader->depth() > $depth); // Read each sibling.
28+
29+
$reader->close();
30+
}
31+
}

tests/HugeManifest.php

Lines changed: 195 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,195 @@
1+
<?php
2+
3+
declare(strict_types=1);
4+
5+
namespace Tests\Keboola\Db\ImportExport;
6+
7+
use Keboola\Csv\CsvOptions;
8+
use Keboola\Db\ImportExport\ImportOptions;
9+
use Keboola\Db\ImportExport\Storage;
10+
use Keboola\Temp\Temp;
11+
use MicrosoftAzure\Storage\Common\Internal\Resources;
12+
use Symfony\Component\Stopwatch\Stopwatch;
13+
14+
class HugeManifest
15+
{
16+
private const MANIFEST_FILE_NAME = 'hugeManifest.json';
17+
private const SLICES_TOTAL = 500 * 1000;
18+
use ABSSourceTrait;
19+
20+
/**
21+
* @var string
22+
*/
23+
private $manifestFile;
24+
25+
/**
26+
* @var AbsLoader
27+
*/
28+
private $loader;
29+
30+
/**
31+
* @var Stopwatch
32+
*/
33+
private $stopwatch;
34+
35+
/**
36+
* @var Temp
37+
*/
38+
private $temp;
39+
40+
/**
41+
* @var string
42+
*/
43+
private $containerName;
44+
45+
/**
46+
* @var string
47+
*/
48+
private $accountName;
49+
50+
public function __construct(
51+
string $accountName,
52+
string $containerName
53+
) {
54+
$this->accountName = $accountName;
55+
$this->containerName = $containerName;
56+
$this->loader = new AbsLoader($accountName, $containerName);
57+
}
58+
59+
public function run(): void
60+
{
61+
$this->temp = new Temp();
62+
$this->temp->initRunFolder();
63+
64+
$this->stopwatch = new Stopwatch();
65+
$this->stopwatch->start('test');
66+
$this->loader->deleteContainer();
67+
$this->loader->createContainer();
68+
69+
$this->stopwatch->start('upload');
70+
$this->uploadManifestAndSlices();
71+
$event = $this->stopwatch->stop('upload');
72+
echo 'max memory upload: ' . $event->getMemory() . PHP_EOL;
73+
74+
$this->stopwatch->start('commands');
75+
$this->generateCommands();
76+
$event = $this->stopwatch->stop('commands');
77+
echo 'max memory commands: ' . $this->getMemoryForHuman($event->getMemory()) . PHP_EOL;
78+
79+
$this->loader->deleteContainer();
80+
$event = $this->stopwatch->stop('test');
81+
echo 'max memory: ' . $this->getMemoryForHuman($event->getMemory()) . PHP_EOL;
82+
}
83+
84+
private function uploadManifestAndSlices(): void
85+
{
86+
$this->printMemory();
87+
$manifest = $this->openManifestFile();
88+
89+
echo 'Generating manifest' . PHP_EOL;
90+
for ($i = 0; $i <= self::SLICES_TOTAL; $i++) {
91+
$sliceName = sprintf('my_awesome_long_name_slice.csv_%d', $i);
92+
fwrite($manifest, sprintf(
93+
'{"url":"%s"}%s' . PHP_EOL,
94+
$this->getAbsUrl($sliceName),
95+
$i === self::SLICES_TOTAL ? '' : ','
96+
));
97+
}
98+
99+
$this->closeManifestFile($manifest);
100+
101+
echo PHP_EOL;
102+
103+
echo 'Uploading manifest' . PHP_EOL;
104+
105+
$this->loader->getBlobService()->createBlockBlob(
106+
$this->containerName,
107+
self::MANIFEST_FILE_NAME,
108+
file_get_contents($this->getManifestFileName())
109+
);
110+
111+
echo sprintf('Manifest file size: %s bytes', filesize($this->getManifestFileName())) . PHP_EOL;
112+
$this->printMemory();
113+
}
114+
115+
private function printMemory(): void
116+
{
117+
$memUsage = memory_get_usage(true);
118+
119+
echo $this->getMemoryForHuman($memUsage);
120+
121+
echo PHP_EOL;
122+
}
123+
124+
private function getMemoryForHuman(int $memUsage): string
125+
{
126+
if ($memUsage < 1024) {
127+
return $memUsage . ' bytes';
128+
} elseif ($memUsage < 1048576) {
129+
return round($memUsage / 1024, 2) . ' kilobytes';
130+
} else {
131+
return round($memUsage / 1048576, 2) . ' megabytes';
132+
}
133+
}
134+
135+
/**
136+
* @return false|resource
137+
*/
138+
private function openManifestFile()
139+
{
140+
file_put_contents($this->getManifestFileName(), '{"entries":[' . PHP_EOL);
141+
return fopen($this->getManifestFileName(), 'a');
142+
}
143+
144+
private function getManifestFileName(): string
145+
{
146+
if ($this->manifestFile === null) {
147+
$this->manifestFile = $this->temp->getTmpFolder() . '/' . self::MANIFEST_FILE_NAME;
148+
}
149+
150+
return $this->manifestFile;
151+
}
152+
153+
private function getAbsUrl(string $fileName): string
154+
{
155+
return sprintf(
156+
'azure://%s.%s/%s/%s',
157+
$this->accountName,
158+
Resources::BLOB_BASE_DNS_NAME,
159+
$this->containerName,
160+
$fileName
161+
);
162+
}
163+
164+
/**
165+
* @param resource $resource
166+
*/
167+
private function closeManifestFile($resource): void
168+
{
169+
fwrite($resource, ']}');
170+
fclose($resource);
171+
}
172+
173+
private function generateCommands(): void
174+
{
175+
$source = new Storage\ABS\SourceFile(
176+
$this->containerName,
177+
self::MANIFEST_FILE_NAME,
178+
$this->getCredentialsForAzureContainer($this->containerName),
179+
$this->accountName,
180+
new CsvOptions,
181+
true
182+
);
183+
$destination = new Storage\Snowflake\Table('schema', 'table');
184+
$options = new ImportOptions();
185+
$adapter = new Storage\ABS\SnowflakeImportAdapter($source);
186+
echo 'Generating commands' . PHP_EOL;
187+
foreach ($adapter->getCopyCommands(
188+
$destination,
189+
$options,
190+
'stagingTable'
191+
) as $index => $cmd) {
192+
$this->stopwatch->lap('commands');
193+
};
194+
}
195+
}

0 commit comments

Comments
 (0)