diff --git a/src/Component.php b/src/Component.php index 06f08a7..341cb53 100644 --- a/src/Component.php +++ b/src/Component.php @@ -73,6 +73,7 @@ protected function run(): void $strategy = new DatabaseMigrate( $this->getLogger(), $targetConnection, + $sourceSapiClient, new BranchAwareClient( $defaultBranch['id'], [ diff --git a/src/StorageModifier.php b/src/StorageModifier.php new file mode 100644 index 0000000..04fcce5 --- /dev/null +++ b/src/StorageModifier.php @@ -0,0 +1,171 @@ +tmp = new Temp(); + } + + public function createBucket(string $schemaName): void + { + [$bucketStage, $bucketName] = explode('.', $schemaName); + if (str_starts_with($bucketName, 'c-')) { + $bucketName = substr($bucketName, 2); + } + $this->client->createBucket($bucketName, $bucketStage); + } + + public function createTable(array $tableInfo): void + { + if ($tableInfo['isTyped']) { + $this->createTypedTable($tableInfo); + } else { + $this->createNonTypedTable($tableInfo); + } + + $this->restoreTableColumnsMetadata($tableInfo, $tableInfo['id'], new Metadata($this->client)); + } + + private function createNonTypedTable(array $tableInfo): void + { + $tempFile = $this->tmp->createFile(sprintf('%s.header.csv', $tableInfo['id'])); + $headerFile = new CsvFile($tempFile->getPathname()); + $headerFile->writeRow($tableInfo['columns']); + + $this->client->createTableAsync( + $tableInfo['bucket']['id'], + $tableInfo['name'], + $headerFile, + [ + 'primaryKey' => join(',', $tableInfo['primaryKey']), + ], + ); + } + + private function createTypedTable(array $tableInfo): void + { + $columns = []; + foreach ($tableInfo['columns'] as $column) { + $columns[$column] = []; + } + foreach ($tableInfo['columnMetadata'] ?? [] as $columnName => $column) { + $columnName = (string) $columnName; + $columnMetadata = []; + foreach ($column as $metadata) { + if ($metadata['provider'] !== 'storage') { + continue; + } + $columnMetadata[$metadata['key']] = $metadata['value']; + } + + $definition = [ + 'type' => $columnMetadata['KBC.datatype.type'], + 'nullable' => $columnMetadata['KBC.datatype.nullable'] === '1', + ]; + if (isset($columnMetadata['KBC.datatype.length'])) { + $definition['length'] = $columnMetadata['KBC.datatype.length']; + } + if (isset($columnMetadata['KBC.datatype.default'])) { + $definition['default'] = $columnMetadata['KBC.datatype.default']; + } + + $columns[$columnName] = [ + 'name' => $columnName, + 'definition' => $definition, + 'basetype' => $columnMetadata['KBC.datatype.basetype'], + ]; + } + + $data = [ + 'name' => $tableInfo['name'], + 'primaryKeysNames' => $tableInfo['primaryKey'], + 'columns' => array_values($columns), + ]; + + if ($tableInfo['bucket']['backend'] === 'synapse') { + $data['distribution'] = [ + 'type' => $tableInfo['distributionType'], + 'distributionColumnsNames' => $tableInfo['distributionKey'], + ]; + $data['index'] = [ + 'type' => $tableInfo['indexType'], + 'indexColumnsNames' => $tableInfo['indexKey'], + ]; + } + + try { + $this->client->createTableDefinition( + $tableInfo['bucket']['id'], + $data, + ); + } catch (ClientException $e) { + if ($e->getCode() === 400 + && str_contains($e->getMessage(), 'Primary keys columns must be set nullable false')) { + throw new StorageApiException(sprintf( + 'Table "%s" cannot be restored because the primary key cannot be set on a nullable column.', + $tableInfo['name'], + )); + } + throw $e; + } + } + + private function restoreTableColumnsMetadata(array $tableInfo, string $tableId, Metadata $metadataClient): void + { + $metadatas = []; + if (isset($tableInfo['metadata']) && count($tableInfo['metadata'])) { + foreach ($this->prepareMetadata($tableInfo['metadata']) as $provider => $metadata) { + $metadatas[$provider]['table'] = $metadata; + } + } + foreach ($tableInfo['columnMetadata'] ?? [] as $column => $columnMetadata) { + foreach ($this->prepareMetadata($columnMetadata) as $provider => $metadata) { + if ($metadata !== []) { + $metadatas[$provider]['columns'][$column] = $metadata; + } + } + } + + /** @var array $metadata */ + foreach ($metadatas as $provider => $metadata) { + if ($provider === 'storage') { + continue; + } + $tableMetadataUpdateOptions = new TableMetadataUpdateOptions( + $tableId, + (string) $provider, + $metadata['table'] ?? null, + $metadata['columns'] ?? null, + ); + + $metadataClient->postTableMetadataWithColumns($tableMetadataUpdateOptions); + } + } + + private function prepareMetadata(array $rawMetadata): array + { + $result = []; + foreach ($rawMetadata as $item) { + $result[$item['provider']][] = [ + 'key' => $item['key'], + 'value' => $item['value'], + ]; + } + return $result; + } +} diff --git a/src/Strategy/DatabaseMigrate.php b/src/Strategy/DatabaseMigrate.php index b2d2853..520c8f6 100644 --- a/src/Strategy/DatabaseMigrate.php +++ b/src/Strategy/DatabaseMigrate.php @@ -7,6 +7,7 @@ use Keboola\AppProjectMigrateLargeTables\Config; use Keboola\AppProjectMigrateLargeTables\MigrateInterface; use Keboola\AppProjectMigrateLargeTables\Snowflake\Connection; +use Keboola\AppProjectMigrateLargeTables\StorageModifier; use Keboola\SnowflakeDbAdapter\Exception\RuntimeException; use Keboola\SnowflakeDbAdapter\QueryBuilder; use Keboola\StorageApi\Client; @@ -19,15 +20,18 @@ class DatabaseMigrate implements MigrateInterface 'INFORMATION_SCHEMA', 'PUBLIC', ]; + private StorageModifier $storageModifier; public function __construct( private readonly LoggerInterface $logger, private readonly Connection $targetConnection, + private readonly Client $sourceSapiClient, private readonly Client $targetSapiClient, private readonly string $sourceDatabase, private readonly string $replicaDatabase, private readonly string $targetDatabase, ) { + $this->storageModifier = new StorageModifier($this->targetSapiClient); } public function migrate(Config $config): void @@ -62,19 +66,34 @@ public function migrate(Config $config): void 'USE DATABASE %s;', QueryBuilder::quoteIdentifier($this->targetDatabase), )); + + $currentRole = $this->targetConnection->getCurrentRole(); + $this->targetConnection->useRole('ACCOUNTADMIN'); $schemas = $this->targetConnection->fetchAll(sprintf( 'SHOW SCHEMAS IN DATABASE %s;', - QueryBuilder::quoteIdentifier($this->targetDatabase), + QueryBuilder::quoteIdentifier($this->replicaDatabase), )); + $this->targetConnection->useRole($currentRole); foreach ($schemas as $schema) { - if (in_array($schema['name'], self::SKIP_CLONE_SCHEMAS, true)) { + $schemaName = $schema['name']; + if (in_array($schemaName, self::SKIP_CLONE_SCHEMAS, true)) { + continue; + } + if (str_starts_with($schemaName, 'WORKSPACE')) { continue; } - if (str_starts_with($schema['name'], 'WORKSPACE')) { + + if (!$this->sourceSapiClient->bucketExists($schemaName)) { continue; } - $this->migrateSchema($config->getMigrateTables(), $schema['name']); + + if (!$this->targetSapiClient->bucketExists($schemaName)) { + $this->logger->info(sprintf('Creating bucket "%s".', $schemaName)); + $this->storageModifier->createBucket($schemaName); + } + + $this->migrateSchema($config->getMigrateTables(), $schemaName); } $this->dropReplicaDatabase(); } @@ -82,16 +101,27 @@ public function migrate(Config $config): void private function migrateSchema(array $tablesWhiteList, string $schemaName): void { $this->logger->info(sprintf('Migrating schema %s', $schemaName)); + $currentRole = $this->targetConnection->getCurrentRole(); + $this->targetConnection->useRole('ACCOUNTADMIN'); $tables = $this->targetConnection->fetchAll(sprintf( - 'SHOW TABLES IN SCHEMA %s;', + 'SHOW TABLES IN SCHEMA %s.%s;', + QueryBuilder::quoteIdentifier($this->replicaDatabase), QueryBuilder::quoteIdentifier($schemaName), )); + $this->targetConnection->useRole($currentRole); foreach ($tables as $table) { $tableId = sprintf('%s.%s', $schemaName, $table['name']); if ($tablesWhiteList && !in_array($tableId, $tablesWhiteList, true)) { continue; } + if (!$this->targetSapiClient->tableExists($tableId)) { + $this->logger->info(sprintf('Creating table "%s".', $tableId)); + $this->storageModifier->createTable( + $this->sourceSapiClient->getTable($tableId), + ); + } + $this->migrateTable($schemaName, $table['name']); } diff --git a/src/Strategy/DatabaseReplication.php b/src/Strategy/DatabaseReplication.php index fe3bc5c..0c4344e 100644 --- a/src/Strategy/DatabaseReplication.php +++ b/src/Strategy/DatabaseReplication.php @@ -41,8 +41,8 @@ public function createReplication(string $sourceDatabase): void $this->sourceConnection->query(sprintf( 'ALTER DATABASE %s ENABLE REPLICATION TO ACCOUNTS %s.%s;', QueryBuilder::quoteIdentifier($sourceDatabase), - $this->targetConnection->getRegion(), - $this->targetConnection->getAccount(), + QueryBuilder::quoteIdentifier($this->targetConnection->getRegion()), + QueryBuilder::quoteIdentifier($this->targetConnection->getAccount()), )); } } diff --git a/src/Strategy/SapiMigrate.php b/src/Strategy/SapiMigrate.php index f010637..81fec69 100644 --- a/src/Strategy/SapiMigrate.php +++ b/src/Strategy/SapiMigrate.php @@ -6,6 +6,7 @@ use Keboola\AppProjectMigrateLargeTables\Config; use Keboola\AppProjectMigrateLargeTables\MigrateInterface; +use Keboola\AppProjectMigrateLargeTables\StorageModifier; use Keboola\StorageApi\Client; use Keboola\StorageApi\ClientException; use Keboola\StorageApi\Options\FileUploadOptions; @@ -14,11 +15,17 @@ class SapiMigrate implements MigrateInterface { + private StorageModifier $storageModifier; + + /** @var string[] $bucketsExist */ + private array $bucketsExist = []; + public function __construct( private readonly Client $sourceClient, private readonly Client $targetClient, private readonly LoggerInterface $logger, ) { + $this->storageModifier = new StorageModifier($this->targetClient); } public function migrate(Config $config): void @@ -44,6 +51,20 @@ public function migrate(Config $config): void continue; } + if (!in_array($tableInfo['bucket']['id'], $this->bucketsExist) && + !$this->targetClient->bucketExists($tableInfo['bucket']['id'])) { + $this->logger->info(sprintf('Creating bucket %s', $tableInfo['bucket']['id'])); + $this->bucketsExist[] = $tableInfo['bucket']['id']; + + $this->storageModifier->createBucket($tableInfo['bucket']['id']); + } + + if (!$this->targetClient->tableExists($tableId)) { + $this->logger->info(sprintf('Creating table %s', $tableInfo['id'])); + + $this->storageModifier->createTable($tableInfo); + } + $this->migrateTable($tableInfo); } } @@ -95,15 +116,25 @@ private function migrateTable(array $sourceTableInfo): void private function getAllTables(): array { - $buckets = $this->targetClient->listBuckets(); + $buckets = $this->sourceClient->listBuckets(); $listTables = []; foreach ($buckets as $bucket) { - $bucketTables = $this->targetClient->listTables($bucket['id']); + $sourceBucketTables = $this->sourceClient->listTables($bucket['id']); + if (!$this->targetClient->bucketExists($bucket['id'])) { + $targetBucketTables = []; + } else { + $targetBucketTables = $this->targetClient->listTables($bucket['id']); + } - // migrate only empty tables $filteredBucketTables = array_filter( - $bucketTables, - fn($v) => $v['rowsCount'] === 0 || is_null($v['rowsCount']), + $sourceBucketTables, + function ($sourceTable) use ($targetBucketTables) { + $v = current(array_filter( + $targetBucketTables, + fn($v) => $v['id'] === $sourceTable['id'], + )); + return empty($v) || $v['rowsCount'] === 0 || is_null($v['rowsCount']); + }, ); $listTables = array_merge(