From 671725658979258cadf097fc24f9c1fc217d9a17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Jodas?= Date: Wed, 21 Aug 2024 21:51:39 +0200 Subject: [PATCH 1/8] migrate new buckets/tables --- src/Component.php | 1 + src/StorageModifier.php | 122 +++++++++++++++++++++++++++ src/Strategy/DatabaseMigrate.php | 41 +++++++-- src/Strategy/DatabaseReplication.php | 4 +- 4 files changed, 161 insertions(+), 7 deletions(-) create mode 100644 src/StorageModifier.php diff --git a/src/Component.php b/src/Component.php index 06f08a7..341cb53 100644 --- a/src/Component.php +++ b/src/Component.php @@ -73,6 +73,7 @@ protected function run(): void $strategy = new DatabaseMigrate( $this->getLogger(), $targetConnection, + $sourceSapiClient, new BranchAwareClient( $defaultBranch['id'], [ diff --git a/src/StorageModifier.php b/src/StorageModifier.php new file mode 100644 index 0000000..0d9c272 --- /dev/null +++ b/src/StorageModifier.php @@ -0,0 +1,122 @@ +tmp = new Temp(); + } + + public function createBucket(string $schemaName): void + { + list($bucketStage, $bucketName) = explode('.', $schemaName); + if (str_starts_with($bucketName, 'c-')) { + $bucketName = substr($bucketName, 2); + } + $this->client->createBucket($bucketName, $bucketStage); + } + + public function createTable(array $tableInfo): void + { + if ($tableInfo['isTyped']) { + $this->createTypedTable($tableInfo); + } else { + $this->createNonTypedTable($tableInfo); + } + } + + private function createNonTypedTable(array $tableInfo): void + { + $headerFile = $this->tmp->createFile(sprintf('%s.header.csv', $tableInfo['id'])); + $headerFile = new CsvFile($headerFile->getPathname()); + $headerFile->writeRow($tableInfo['columns']); + + $this->client->createTableAsync( + $tableInfo['bucket']['id'], + $tableInfo['name'], + $headerFile, + [ + 'primaryKey' => join(',', $tableInfo['primaryKey']), + ], + ); + } + + private function createTypedTable(array $tableInfo): void + { + $columns = []; + foreach ($tableInfo['columns'] as $column) { + $columns[$column] = []; + } + foreach ($tableInfo['columnMetadata'] ?? [] as $columnName => $column) { + $columnMetadata = []; + foreach ($column as $metadata) { + if ($metadata['provider'] !== 'storage') { + continue; + } + $columnMetadata[$metadata['key']] = $metadata['value']; + } + + $definition = [ + 'type' => $columnMetadata['KBC.datatype.type'], + 'nullable' => $columnMetadata['KBC.datatype.nullable'] === '1', + ]; + if (isset($columnMetadata['KBC.datatype.length'])) { + $definition['length'] = $columnMetadata['KBC.datatype.length']; + } + if (isset($columnMetadata['KBC.datatype.default'])) { + $definition['default'] = $columnMetadata['KBC.datatype.default']; + } + + $columns[$columnName] = [ + 'name' => $columnName, + 'definition' => $definition, + 'basetype' => $columnMetadata['KBC.datatype.basetype'], + ]; + } + + $data = [ + 'name' => $tableInfo['name'], + 'primaryKeysNames' => $tableInfo['primaryKey'], + 'columns' => array_values($columns), + ]; + + if ($tableInfo['bucket']['backend'] === 'synapse') { + $data['distribution'] = [ + 'type' => $tableInfo['distributionType'], + 'distributionColumnsNames' => $tableInfo['distributionKey'], + ]; + $data['index'] = [ + 'type' => $tableInfo['indexType'], + 'indexColumnsNames' => $tableInfo['indexKey'], + ]; + } + + try { + $this->client->createTableDefinition( + $tableInfo['bucket']['id'], + $data, + ); + } catch (ClientException $e) { + if ($e->getCode() === 400 + && str_contains($e->getMessage(), 'Primary keys columns must be set nullable false')) { + throw new StorageApiException(sprintf( + 'Table "%s" cannot be restored because the primary key cannot be set on a nullable column.', + $tableInfo['name'], + )); + } + throw $e; + } + } +} diff --git a/src/Strategy/DatabaseMigrate.php b/src/Strategy/DatabaseMigrate.php index b2d2853..3a92062 100644 --- a/src/Strategy/DatabaseMigrate.php +++ b/src/Strategy/DatabaseMigrate.php @@ -7,6 +7,7 @@ use Keboola\AppProjectMigrateLargeTables\Config; use Keboola\AppProjectMigrateLargeTables\MigrateInterface; use Keboola\AppProjectMigrateLargeTables\Snowflake\Connection; +use Keboola\AppProjectMigrateLargeTables\StorageModifier; use Keboola\SnowflakeDbAdapter\Exception\RuntimeException; use Keboola\SnowflakeDbAdapter\QueryBuilder; use Keboola\StorageApi\Client; @@ -19,15 +20,18 @@ class DatabaseMigrate implements MigrateInterface 'INFORMATION_SCHEMA', 'PUBLIC', ]; + private StorageModifier $storageModifier; public function __construct( private readonly LoggerInterface $logger, private readonly Connection $targetConnection, + private readonly Client $sourceSapiClient, private readonly Client $targetSapiClient, private readonly string $sourceDatabase, private readonly string $replicaDatabase, private readonly string $targetDatabase, ) { + $this->storageModifier = new StorageModifier($this->targetSapiClient); } public function migrate(Config $config): void @@ -62,19 +66,35 @@ public function migrate(Config $config): void 'USE DATABASE %s;', QueryBuilder::quoteIdentifier($this->targetDatabase), )); + + $currentRole = $this->targetConnection->getCurrentRole(); + $this->targetConnection->useRole('ACCOUNTADMIN'); $schemas = $this->targetConnection->fetchAll(sprintf( 'SHOW SCHEMAS IN DATABASE %s;', - QueryBuilder::quoteIdentifier($this->targetDatabase), + QueryBuilder::quoteIdentifier($this->replicaDatabase), )); + $this->targetConnection->useRole($currentRole); foreach ($schemas as $schema) { - if (in_array($schema['name'], self::SKIP_CLONE_SCHEMAS, true)) { + $schemaName = $schema['name']; + if (in_array($schemaName, self::SKIP_CLONE_SCHEMAS, true)) { + continue; + } + if (str_starts_with($schemaName, 'WORKSPACE')) { continue; } - if (str_starts_with($schema['name'], 'WORKSPACE')) { + + if (!$this->sourceSapiClient->bucketExists($schemaName)) { continue; } - $this->migrateSchema($config->getMigrateTables(), $schema['name']); + + if (!$this->targetSapiClient->bucketExists($schemaName)) { + $this->logger->info(sprintf('Creating bucket "%s".', $schemaName)); + $this->storageModifier->createBucket($schemaName); + } + + $this->migrateSchema($config->getMigrateTables(), $schemaName); + exit(); } $this->dropReplicaDatabase(); } @@ -82,16 +102,27 @@ public function migrate(Config $config): void private function migrateSchema(array $tablesWhiteList, string $schemaName): void { $this->logger->info(sprintf('Migrating schema %s', $schemaName)); + $currentRole = $this->targetConnection->getCurrentRole(); + $this->targetConnection->useRole('ACCOUNTADMIN'); $tables = $this->targetConnection->fetchAll(sprintf( - 'SHOW TABLES IN SCHEMA %s;', + 'SHOW TABLES IN SCHEMA %s.%s;', + QueryBuilder::quoteIdentifier($this->replicaDatabase), QueryBuilder::quoteIdentifier($schemaName), )); + $this->targetConnection->useRole($currentRole); foreach ($tables as $table) { $tableId = sprintf('%s.%s', $schemaName, $table['name']); if ($tablesWhiteList && !in_array($tableId, $tablesWhiteList, true)) { continue; } + if (!$this->targetSapiClient->tableExists($tableId)) { + $this->logger->info(sprintf('Creating table "%s".', $tableId)); + $this->storageModifier->createTable( + $this->sourceSapiClient->getTable($tableId), + ); + } + $this->migrateTable($schemaName, $table['name']); } diff --git a/src/Strategy/DatabaseReplication.php b/src/Strategy/DatabaseReplication.php index fe3bc5c..0c4344e 100644 --- a/src/Strategy/DatabaseReplication.php +++ b/src/Strategy/DatabaseReplication.php @@ -41,8 +41,8 @@ public function createReplication(string $sourceDatabase): void $this->sourceConnection->query(sprintf( 'ALTER DATABASE %s ENABLE REPLICATION TO ACCOUNTS %s.%s;', QueryBuilder::quoteIdentifier($sourceDatabase), - $this->targetConnection->getRegion(), - $this->targetConnection->getAccount(), + QueryBuilder::quoteIdentifier($this->targetConnection->getRegion()), + QueryBuilder::quoteIdentifier($this->targetConnection->getAccount()), )); } } From d68c6c5cee6ded005676f7b32f7948e9bdb24dc9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Jodas?= Date: Wed, 21 Aug 2024 21:57:32 +0200 Subject: [PATCH 2/8] migrate new buckets/tables --- src/Strategy/DatabaseMigrate.php | 1 - 1 file changed, 1 deletion(-) diff --git a/src/Strategy/DatabaseMigrate.php b/src/Strategy/DatabaseMigrate.php index 3a92062..520c8f6 100644 --- a/src/Strategy/DatabaseMigrate.php +++ b/src/Strategy/DatabaseMigrate.php @@ -94,7 +94,6 @@ public function migrate(Config $config): void } $this->migrateSchema($config->getMigrateTables(), $schemaName); - exit(); } $this->dropReplicaDatabase(); } From 94c87b3ab1978c5839549d13440f503b354b2457 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Jodas?= Date: Thu, 22 Aug 2024 22:49:21 +0200 Subject: [PATCH 3/8] migrate table/columns metadata --- src/StorageModifier.php | 50 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 50 insertions(+) diff --git a/src/StorageModifier.php b/src/StorageModifier.php index 0d9c272..174c091 100644 --- a/src/StorageModifier.php +++ b/src/StorageModifier.php @@ -8,6 +8,8 @@ use Keboola\StorageApi\Client; use Keboola\StorageApi\ClientException; use Keboola\StorageApi\Exception as StorageApiException; +use Keboola\StorageApi\Metadata; +use Keboola\StorageApi\Options\Metadata\TableMetadataUpdateOptions; use Keboola\Temp\Temp; class StorageModifier @@ -35,6 +37,8 @@ public function createTable(array $tableInfo): void } else { $this->createNonTypedTable($tableInfo); } + + $this->restoreTableColumnsMetadata($tableInfo, $tableInfo['id'], new Metadata($this->client)); } private function createNonTypedTable(array $tableInfo): void @@ -119,4 +123,50 @@ private function createTypedTable(array $tableInfo): void throw $e; } } + + private function restoreTableColumnsMetadata(array $tableInfo, string $tableId, Metadata $metadataClient): void + { + $metadatas = []; + if (isset($tableInfo['metadata']) && count($tableInfo['metadata'])) { + foreach ($this->prepareMetadata($tableInfo['metadata']) as $provider => $metadata) { + $metadatas[$provider]['table'] = $metadata; + } + } + if (isset($tableInfo['columnMetadata']) && count($tableInfo['columnMetadata'])) { + foreach ($tableInfo['columnMetadata'] as $column => $columnMetadata) { + foreach ($this->prepareMetadata($columnMetadata) as $provider => $metadata) { + if ($metadata !== []) { + $metadatas[$provider]['columns'][$column] = $metadata; + } + } + } + } + + /** @var array $metadata */ + foreach ($metadatas as $provider => $metadata) { + if ($provider === 'storage') { + continue; + } + $tableMetadataUpdateOptions = new TableMetadataUpdateOptions( + $tableId, + (string) $provider, + $metadata['table'] ?? null, + $metadata['columns'] ?? null, + ); + + $metadataClient->postTableMetadataWithColumns($tableMetadataUpdateOptions); + } + } + + private function prepareMetadata(array $rawMetadata): array + { + $result = []; + foreach ($rawMetadata as $item) { + $result[$item['provider']][] = [ + 'key' => $item['key'], + 'value' => $item['value'], + ]; + } + return $result; + } } From 1b15f024d058692694b5fb24eecd5d99c1253aa5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Jodas?= Date: Sun, 25 Aug 2024 22:59:05 +0200 Subject: [PATCH 4/8] migrate table/columns in sapi mode --- src/Strategy/SapiMigrate.php | 41 +++++++++++++++++++++++++++++++----- 1 file changed, 36 insertions(+), 5 deletions(-) diff --git a/src/Strategy/SapiMigrate.php b/src/Strategy/SapiMigrate.php index f010637..81fec69 100644 --- a/src/Strategy/SapiMigrate.php +++ b/src/Strategy/SapiMigrate.php @@ -6,6 +6,7 @@ use Keboola\AppProjectMigrateLargeTables\Config; use Keboola\AppProjectMigrateLargeTables\MigrateInterface; +use Keboola\AppProjectMigrateLargeTables\StorageModifier; use Keboola\StorageApi\Client; use Keboola\StorageApi\ClientException; use Keboola\StorageApi\Options\FileUploadOptions; @@ -14,11 +15,17 @@ class SapiMigrate implements MigrateInterface { + private StorageModifier $storageModifier; + + /** @var string[] $bucketsExist */ + private array $bucketsExist = []; + public function __construct( private readonly Client $sourceClient, private readonly Client $targetClient, private readonly LoggerInterface $logger, ) { + $this->storageModifier = new StorageModifier($this->targetClient); } public function migrate(Config $config): void @@ -44,6 +51,20 @@ public function migrate(Config $config): void continue; } + if (!in_array($tableInfo['bucket']['id'], $this->bucketsExist) && + !$this->targetClient->bucketExists($tableInfo['bucket']['id'])) { + $this->logger->info(sprintf('Creating bucket %s', $tableInfo['bucket']['id'])); + $this->bucketsExist[] = $tableInfo['bucket']['id']; + + $this->storageModifier->createBucket($tableInfo['bucket']['id']); + } + + if (!$this->targetClient->tableExists($tableId)) { + $this->logger->info(sprintf('Creating table %s', $tableInfo['id'])); + + $this->storageModifier->createTable($tableInfo); + } + $this->migrateTable($tableInfo); } } @@ -95,15 +116,25 @@ private function migrateTable(array $sourceTableInfo): void private function getAllTables(): array { - $buckets = $this->targetClient->listBuckets(); + $buckets = $this->sourceClient->listBuckets(); $listTables = []; foreach ($buckets as $bucket) { - $bucketTables = $this->targetClient->listTables($bucket['id']); + $sourceBucketTables = $this->sourceClient->listTables($bucket['id']); + if (!$this->targetClient->bucketExists($bucket['id'])) { + $targetBucketTables = []; + } else { + $targetBucketTables = $this->targetClient->listTables($bucket['id']); + } - // migrate only empty tables $filteredBucketTables = array_filter( - $bucketTables, - fn($v) => $v['rowsCount'] === 0 || is_null($v['rowsCount']), + $sourceBucketTables, + function ($sourceTable) use ($targetBucketTables) { + $v = current(array_filter( + $targetBucketTables, + fn($v) => $v['id'] === $sourceTable['id'], + )); + return empty($v) || $v['rowsCount'] === 0 || is_null($v['rowsCount']); + }, ); $listTables = array_merge( From bd2ea001524634f9454e44971eae3fb0b11719bb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Jodas?= Date: Tue, 3 Sep 2024 21:54:43 +0200 Subject: [PATCH 5/8] convert $columnName to string --- src/StorageModifier.php | 1 + 1 file changed, 1 insertion(+) diff --git a/src/StorageModifier.php b/src/StorageModifier.php index 174c091..27ace8a 100644 --- a/src/StorageModifier.php +++ b/src/StorageModifier.php @@ -64,6 +64,7 @@ private function createTypedTable(array $tableInfo): void $columns[$column] = []; } foreach ($tableInfo['columnMetadata'] ?? [] as $columnName => $column) { + $columnName = strval($columnName); $columnMetadata = []; foreach ($column as $metadata) { if ($metadata['provider'] !== 'storage') { From 5412d878c407a4ffc660d3c80f074e3b8d3687a6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Jodas?= <12143866+ondrajodas@users.noreply.github.com> Date: Wed, 4 Sep 2024 20:46:48 +0200 Subject: [PATCH 6/8] Update src/StorageModifier.php MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Roman Pištěk --- src/StorageModifier.php | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/StorageModifier.php b/src/StorageModifier.php index 27ace8a..830c817 100644 --- a/src/StorageModifier.php +++ b/src/StorageModifier.php @@ -23,7 +23,7 @@ public function __construct(readonly Client $client) public function createBucket(string $schemaName): void { - list($bucketStage, $bucketName) = explode('.', $schemaName); + [$bucketStage, $bucketName] = explode('.', $schemaName); if (str_starts_with($bucketName, 'c-')) { $bucketName = substr($bucketName, 2); } From 13657c0f52fbf67509d4e0c1ab1ee10b15b9b16e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Jodas?= <12143866+ondrajodas@users.noreply.github.com> Date: Wed, 4 Sep 2024 20:47:23 +0200 Subject: [PATCH 7/8] Update src/StorageModifier.php MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Roman Pištěk --- src/StorageModifier.php | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/src/StorageModifier.php b/src/StorageModifier.php index 830c817..1450199 100644 --- a/src/StorageModifier.php +++ b/src/StorageModifier.php @@ -133,12 +133,10 @@ private function restoreTableColumnsMetadata(array $tableInfo, string $tableId, $metadatas[$provider]['table'] = $metadata; } } - if (isset($tableInfo['columnMetadata']) && count($tableInfo['columnMetadata'])) { - foreach ($tableInfo['columnMetadata'] as $column => $columnMetadata) { - foreach ($this->prepareMetadata($columnMetadata) as $provider => $metadata) { - if ($metadata !== []) { - $metadatas[$provider]['columns'][$column] = $metadata; - } + foreach ($tableInfo['columnMetadata'] ?? [] as $column => $columnMetadata) { + foreach ($this->prepareMetadata($columnMetadata) as $provider => $metadata) { + if ($metadata !== []) { + $metadatas[$provider]['columns'][$column] = $metadata; } } } From 572c61635ef5cf489b43166e89ac491cea53111e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ond=C5=99ej=20Jodas?= <12143866+ondrajodas@users.noreply.github.com> Date: Wed, 4 Sep 2024 20:48:27 +0200 Subject: [PATCH 8/8] Apply suggestions from code review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Roman Pištěk --- src/StorageModifier.php | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/StorageModifier.php b/src/StorageModifier.php index 1450199..04fcce5 100644 --- a/src/StorageModifier.php +++ b/src/StorageModifier.php @@ -43,8 +43,8 @@ public function createTable(array $tableInfo): void private function createNonTypedTable(array $tableInfo): void { - $headerFile = $this->tmp->createFile(sprintf('%s.header.csv', $tableInfo['id'])); - $headerFile = new CsvFile($headerFile->getPathname()); + $tempFile = $this->tmp->createFile(sprintf('%s.header.csv', $tableInfo['id'])); + $headerFile = new CsvFile($tempFile->getPathname()); $headerFile->writeRow($tableInfo['columns']); $this->client->createTableAsync( @@ -64,7 +64,7 @@ private function createTypedTable(array $tableInfo): void $columns[$column] = []; } foreach ($tableInfo['columnMetadata'] ?? [] as $columnName => $column) { - $columnName = strval($columnName); + $columnName = (string) $columnName; $columnMetadata = []; foreach ($column as $metadata) { if ($metadata['provider'] !== 'storage') {