Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

migrate new buckets/tables #14

Merged
merged 8 commits into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/Component.php
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@ protected function run(): void
$strategy = new DatabaseMigrate(
$this->getLogger(),
$targetConnection,
$sourceSapiClient,
new BranchAwareClient(
$defaultBranch['id'],
[
Expand Down
171 changes: 171 additions & 0 deletions src/StorageModifier.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,171 @@
<?php

declare(strict_types=1);

namespace Keboola\AppProjectMigrateLargeTables;

use Keboola\Csv\CsvFile;
use Keboola\StorageApi\Client;
use Keboola\StorageApi\ClientException;
use Keboola\StorageApi\Exception as StorageApiException;
use Keboola\StorageApi\Metadata;
use Keboola\StorageApi\Options\Metadata\TableMetadataUpdateOptions;
use Keboola\Temp\Temp;

class StorageModifier
{
private Temp $tmp;

public function __construct(readonly Client $client)
{
$this->tmp = new Temp();
}

public function createBucket(string $schemaName): void
{
[$bucketStage, $bucketName] = explode('.', $schemaName);
if (str_starts_with($bucketName, 'c-')) {
$bucketName = substr($bucketName, 2);
}
$this->client->createBucket($bucketName, $bucketStage);
}

public function createTable(array $tableInfo): void
{
if ($tableInfo['isTyped']) {
$this->createTypedTable($tableInfo);
} else {
$this->createNonTypedTable($tableInfo);
}

$this->restoreTableColumnsMetadata($tableInfo, $tableInfo['id'], new Metadata($this->client));
}

private function createNonTypedTable(array $tableInfo): void
{
$tempFile = $this->tmp->createFile(sprintf('%s.header.csv', $tableInfo['id']));
$headerFile = new CsvFile($tempFile->getPathname());
$headerFile->writeRow($tableInfo['columns']);

$this->client->createTableAsync(
$tableInfo['bucket']['id'],
$tableInfo['name'],
$headerFile,
[
'primaryKey' => join(',', $tableInfo['primaryKey']),
],
);
}

private function createTypedTable(array $tableInfo): void
{
$columns = [];
foreach ($tableInfo['columns'] as $column) {
$columns[$column] = [];
}
foreach ($tableInfo['columnMetadata'] ?? [] as $columnName => $column) {
$columnName = (string) $columnName;
$columnMetadata = [];
foreach ($column as $metadata) {
if ($metadata['provider'] !== 'storage') {
continue;
}
$columnMetadata[$metadata['key']] = $metadata['value'];
}

$definition = [
'type' => $columnMetadata['KBC.datatype.type'],
'nullable' => $columnMetadata['KBC.datatype.nullable'] === '1',
];
if (isset($columnMetadata['KBC.datatype.length'])) {
$definition['length'] = $columnMetadata['KBC.datatype.length'];
}
if (isset($columnMetadata['KBC.datatype.default'])) {
$definition['default'] = $columnMetadata['KBC.datatype.default'];
}

$columns[$columnName] = [
'name' => $columnName,
'definition' => $definition,
'basetype' => $columnMetadata['KBC.datatype.basetype'],
];
}

$data = [
'name' => $tableInfo['name'],
'primaryKeysNames' => $tableInfo['primaryKey'],
'columns' => array_values($columns),
];

if ($tableInfo['bucket']['backend'] === 'synapse') {
$data['distribution'] = [
'type' => $tableInfo['distributionType'],
'distributionColumnsNames' => $tableInfo['distributionKey'],
];
$data['index'] = [
'type' => $tableInfo['indexType'],
'indexColumnsNames' => $tableInfo['indexKey'],
];
}

try {
$this->client->createTableDefinition(
$tableInfo['bucket']['id'],
$data,
);
} catch (ClientException $e) {
if ($e->getCode() === 400
&& str_contains($e->getMessage(), 'Primary keys columns must be set nullable false')) {
throw new StorageApiException(sprintf(
'Table "%s" cannot be restored because the primary key cannot be set on a nullable column.',
$tableInfo['name'],
));
}
throw $e;
}
}

private function restoreTableColumnsMetadata(array $tableInfo, string $tableId, Metadata $metadataClient): void
{
$metadatas = [];
if (isset($tableInfo['metadata']) && count($tableInfo['metadata'])) {
foreach ($this->prepareMetadata($tableInfo['metadata']) as $provider => $metadata) {
$metadatas[$provider]['table'] = $metadata;
}
}
foreach ($tableInfo['columnMetadata'] ?? [] as $column => $columnMetadata) {
foreach ($this->prepareMetadata($columnMetadata) as $provider => $metadata) {
if ($metadata !== []) {
$metadatas[$provider]['columns'][$column] = $metadata;
}
}
}

/** @var array $metadata */
foreach ($metadatas as $provider => $metadata) {
if ($provider === 'storage') {
continue;
}
$tableMetadataUpdateOptions = new TableMetadataUpdateOptions(
$tableId,
(string) $provider,
$metadata['table'] ?? null,
$metadata['columns'] ?? null,
);

$metadataClient->postTableMetadataWithColumns($tableMetadataUpdateOptions);
}
}

private function prepareMetadata(array $rawMetadata): array
{
$result = [];
foreach ($rawMetadata as $item) {
$result[$item['provider']][] = [
'key' => $item['key'],
'value' => $item['value'],
];
}
return $result;
}
}
40 changes: 35 additions & 5 deletions src/Strategy/DatabaseMigrate.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
use Keboola\AppProjectMigrateLargeTables\Config;
use Keboola\AppProjectMigrateLargeTables\MigrateInterface;
use Keboola\AppProjectMigrateLargeTables\Snowflake\Connection;
use Keboola\AppProjectMigrateLargeTables\StorageModifier;
use Keboola\SnowflakeDbAdapter\Exception\RuntimeException;
use Keboola\SnowflakeDbAdapter\QueryBuilder;
use Keboola\StorageApi\Client;
Expand All @@ -19,15 +20,18 @@ class DatabaseMigrate implements MigrateInterface
'INFORMATION_SCHEMA',
'PUBLIC',
];
private StorageModifier $storageModifier;

public function __construct(
private readonly LoggerInterface $logger,
private readonly Connection $targetConnection,
private readonly Client $sourceSapiClient,
private readonly Client $targetSapiClient,
private readonly string $sourceDatabase,
private readonly string $replicaDatabase,
private readonly string $targetDatabase,
) {
$this->storageModifier = new StorageModifier($this->targetSapiClient);
}

public function migrate(Config $config): void
Expand Down Expand Up @@ -62,36 +66,62 @@ public function migrate(Config $config): void
'USE DATABASE %s;',
QueryBuilder::quoteIdentifier($this->targetDatabase),
));

$currentRole = $this->targetConnection->getCurrentRole();
$this->targetConnection->useRole('ACCOUNTADMIN');
$schemas = $this->targetConnection->fetchAll(sprintf(
'SHOW SCHEMAS IN DATABASE %s;',
QueryBuilder::quoteIdentifier($this->targetDatabase),
QueryBuilder::quoteIdentifier($this->replicaDatabase),
));
$this->targetConnection->useRole($currentRole);

foreach ($schemas as $schema) {
if (in_array($schema['name'], self::SKIP_CLONE_SCHEMAS, true)) {
$schemaName = $schema['name'];
if (in_array($schemaName, self::SKIP_CLONE_SCHEMAS, true)) {
continue;
}
if (str_starts_with($schemaName, 'WORKSPACE')) {
continue;
}
if (str_starts_with($schema['name'], 'WORKSPACE')) {

if (!$this->sourceSapiClient->bucketExists($schemaName)) {
continue;
}
$this->migrateSchema($config->getMigrateTables(), $schema['name']);

if (!$this->targetSapiClient->bucketExists($schemaName)) {
$this->logger->info(sprintf('Creating bucket "%s".', $schemaName));
$this->storageModifier->createBucket($schemaName);
}

$this->migrateSchema($config->getMigrateTables(), $schemaName);
}
$this->dropReplicaDatabase();
}

private function migrateSchema(array $tablesWhiteList, string $schemaName): void
{
$this->logger->info(sprintf('Migrating schema %s', $schemaName));
$currentRole = $this->targetConnection->getCurrentRole();
$this->targetConnection->useRole('ACCOUNTADMIN');
$tables = $this->targetConnection->fetchAll(sprintf(
'SHOW TABLES IN SCHEMA %s;',
'SHOW TABLES IN SCHEMA %s.%s;',
QueryBuilder::quoteIdentifier($this->replicaDatabase),
QueryBuilder::quoteIdentifier($schemaName),
));
$this->targetConnection->useRole($currentRole);

foreach ($tables as $table) {
$tableId = sprintf('%s.%s', $schemaName, $table['name']);
if ($tablesWhiteList && !in_array($tableId, $tablesWhiteList, true)) {
continue;
}
if (!$this->targetSapiClient->tableExists($tableId)) {
$this->logger->info(sprintf('Creating table "%s".', $tableId));
$this->storageModifier->createTable(
$this->sourceSapiClient->getTable($tableId),
);
}

$this->migrateTable($schemaName, $table['name']);
}

Expand Down
4 changes: 2 additions & 2 deletions src/Strategy/DatabaseReplication.php
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ public function createReplication(string $sourceDatabase): void
$this->sourceConnection->query(sprintf(
'ALTER DATABASE %s ENABLE REPLICATION TO ACCOUNTS %s.%s;',
QueryBuilder::quoteIdentifier($sourceDatabase),
$this->targetConnection->getRegion(),
$this->targetConnection->getAccount(),
QueryBuilder::quoteIdentifier($this->targetConnection->getRegion()),
QueryBuilder::quoteIdentifier($this->targetConnection->getAccount()),
));
}
}
41 changes: 36 additions & 5 deletions src/Strategy/SapiMigrate.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Keboola\AppProjectMigrateLargeTables\Config;
use Keboola\AppProjectMigrateLargeTables\MigrateInterface;
use Keboola\AppProjectMigrateLargeTables\StorageModifier;
use Keboola\StorageApi\Client;
use Keboola\StorageApi\ClientException;
use Keboola\StorageApi\Options\FileUploadOptions;
Expand All @@ -14,11 +15,17 @@

class SapiMigrate implements MigrateInterface
{
private StorageModifier $storageModifier;

/** @var string[] $bucketsExist */
private array $bucketsExist = [];

public function __construct(
private readonly Client $sourceClient,
private readonly Client $targetClient,
private readonly LoggerInterface $logger,
) {
$this->storageModifier = new StorageModifier($this->targetClient);
}

public function migrate(Config $config): void
Expand All @@ -44,6 +51,20 @@ public function migrate(Config $config): void
continue;
}

if (!in_array($tableInfo['bucket']['id'], $this->bucketsExist) &&
!$this->targetClient->bucketExists($tableInfo['bucket']['id'])) {
$this->logger->info(sprintf('Creating bucket %s', $tableInfo['bucket']['id']));
$this->bucketsExist[] = $tableInfo['bucket']['id'];

$this->storageModifier->createBucket($tableInfo['bucket']['id']);
}

if (!$this->targetClient->tableExists($tableId)) {
$this->logger->info(sprintf('Creating table %s', $tableInfo['id']));

$this->storageModifier->createTable($tableInfo);
}

$this->migrateTable($tableInfo);
}
}
Expand Down Expand Up @@ -95,15 +116,25 @@ private function migrateTable(array $sourceTableInfo): void

private function getAllTables(): array
{
$buckets = $this->targetClient->listBuckets();
$buckets = $this->sourceClient->listBuckets();
$listTables = [];
foreach ($buckets as $bucket) {
$bucketTables = $this->targetClient->listTables($bucket['id']);
$sourceBucketTables = $this->sourceClient->listTables($bucket['id']);
if (!$this->targetClient->bucketExists($bucket['id'])) {
$targetBucketTables = [];
} else {
$targetBucketTables = $this->targetClient->listTables($bucket['id']);
}

// migrate only empty tables
$filteredBucketTables = array_filter(
$bucketTables,
fn($v) => $v['rowsCount'] === 0 || is_null($v['rowsCount']),
$sourceBucketTables,
function ($sourceTable) use ($targetBucketTables) {
$v = current(array_filter(
$targetBucketTables,
fn($v) => $v['id'] === $sourceTable['id'],
));
return empty($v) || $v['rowsCount'] === 0 || is_null($v['rowsCount']);
},
);

$listTables = array_merge(
Expand Down