Skip to content

Commit

Permalink
fix(test): add a test for the sync context, clean up some bugs with it
Browse files Browse the repository at this point in the history
  • Loading branch information
djMax committed Jan 14, 2024
1 parent 6cc3bce commit 5e60e99
Show file tree
Hide file tree
Showing 5 changed files with 96 additions and 41 deletions.
4 changes: 2 additions & 2 deletions __tests__/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ describe('move tables from postgres to clickhouse', () => {
},
insert(stream) {
return ch.insert({
table: 'identity__address_types',
table: 'address_types',
values: stream,
format: 'JSONEachRow',
});
Expand All @@ -75,7 +75,7 @@ describe('move tables from postgres to clickhouse', () => {
};
},
clickhouse: ch,
tableName: 'identity__address_types',
tableName: 'address_types',
}, {});
expect(detail.rows).toBe(2);
});
Expand Down
57 changes: 54 additions & 3 deletions __tests__/kysely.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { afterEach, beforeEach, describe, expect, test } from 'vitest';
import { ClickHouseClient } from '@clickhouse/client';
import { Kysely } from 'kysely';

import { copyTable, syncTable } from '../src/dbs/kysely';
import { KyselySyncContext, copyTable, syncTable } from '../src/dbs/kysely';

import { createChDb, createPgDb } from './db.fixtures';
import { kyselyDb } from './kysely.fixtures';
Expand Down Expand Up @@ -30,14 +30,14 @@ describe('simple kysely interface', () => {
test('should sync to clickhouse', async () => {
const detail = await copyTable(db, ch, {}, {
from: 'address_types',
to: 'identity__address_types',
to: 'address_types',
pk: 'address_type_id',
});
expect(detail.rows).toBe(2);

const indSpec = {
from: 'individuals',
to: 'identity__individuals',
to: 'individuals',
pk: 'individual_id',
delaySeconds: 0,
} as const;
Expand All @@ -62,4 +62,55 @@ describe('simple kysely interface', () => {
const upd2 = await syncTable(db, ch, upd.bookmark, indSpec);
expect(upd2.rows, 'Copy 2 rows after second update').toBe(2);
});

test('should sync to clickhouse with simplified syntax', async () => {
await db.insertInto('individuals')
.columns(['updated_at'])
.values([{
updated_at: new Date('2024-01-01T00:00:00.000Z'),
}])
.execute();
let context = new KyselySyncContext(db, ch, {});
await Promise.all([
context.forwardOnly('address_types'),
context.withUpdatedAt('individuals'),
]);
expect(context.results).toMatchInlineSnapshot(`
{
"address_types": {
"lastCount": 2,
"rowId": 2,
},
"individuals": {
"lastCount": 4,
"rowId": "4",
"rowTimestamp": "2024-01-01T00:00:00.000",
},
}
`);
context = new KyselySyncContext(db, ch, context.results);
const secondPass = await Promise.all([
context.forwardOnly('address_types'),
context.withUpdatedAt('individuals'),
]);
expect(secondPass).toMatchInlineSnapshot(`
[
{
"bookmark": {
"lastCount": 0,
"rowId": 2,
},
"table": "address_types",
},
{
"bookmark": {
"lastCount": 1,
"rowId": "4",
"rowTimestamp": "2024-01-01T00:00:00.000",
},
"table": "individuals",
},
]
`);
});
});
4 changes: 2 additions & 2 deletions __tests__/migrations/1_test.sql
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
CREATE TABLE identity__address_types (
CREATE TABLE address_types (
address_type_id Int32,
name String,
created_at DateTime64
) ENGINE = ReplacingMergeTree(created_at)
ORDER BY address_type_id;

CREATE TABLE identity__individuals (
CREATE TABLE individuals (
individual_id Int32,
individual_uuid String,
favorite_color String,
Expand Down
67 changes: 38 additions & 29 deletions src/dbs/kysely.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { type Kysely, type AnyColumn, sql } from 'kysely';
import type { ClickHouseClient } from '@clickhouse/client';

import type { Bookmark, ClickhouseRowRecord, RowMapper, SourceDatabaseRowRecord, SyncResult } from '../types';
import type { Bookmark, ClickhouseRowRecord, RowMapper, SourceDatabaseRowRecord } from '../types';
import { synchronizeTable } from '../stream-copy';

type HasUpdatedAt<ColName extends string> = {
Expand Down Expand Up @@ -112,7 +112,7 @@ export async function copyTable<
from: T;
to: string;
pk: PK;
optimize: boolean;
optimize?: boolean;
rowMapper?: RowMapper;
},
) {
Expand Down Expand Up @@ -159,8 +159,6 @@ export async function copyTable<
export class KyselySyncContext<Schema, B extends Partial<Record<keyof Schema, Bookmark<string> | Bookmark<number>>>> {
// Log function that can be overridden
log: (level: 'info' | 'error', message: string, meta?: Record<string, unknown>) => void = () => { };
// The default name of the created_at column
createdAtColumn = 'created_at';
// The default name of the updated_at column
updatedAtColumn = 'updated_at';
// The default name of the primary key column based on the table name (the default strips a plural 's' from the end and adds _id)
Expand All @@ -177,26 +175,27 @@ export class KyselySyncContext<Schema, B extends Partial<Record<keyof Schema, Bo
private readonly bookmark?: B,
) { }

async table<T extends keyof Schema & string>(table: T, opts: {
/**
* Sync a table that only gets additions, no updates (or update tracking)
*/
async forwardOnly<T extends keyof Schema & string>(table: T, opts?: {
pk?: AnyColumn<Schema, T>,
timestampColumn?: AnyColumn<Schema, T>,
rowMapper?: RowMapper,
}): Promise<SyncResult<T, string | number>> {
const { pk, timestampColumn, rowMapper } = opts;
return syncTable(
}) {
const { pk } = opts || {};
return copyTable(
this.db,
this.clickhouse,
(this.bookmark?.[table as keyof B] || {}) as Bookmark<string | number>,
{
...opts,
from: table,
to: table,
pk: (pk || this.getDefaultPrimaryKeyColumn(table)) as AnyColumn<Schema, T>,
optimize: true,
timestampColumn,
rowMapper,
})
.then((result) => {
this.log('info', 'Sync complete', { table, rows: result.rows });
this.log('info', 'Copy complete', { table, rows: result.rows });
const newBookmark = { ...result.bookmark, lastCount: result.rows };
this.results[table as string] = newBookmark;
return {
Expand All @@ -210,29 +209,39 @@ export class KyselySyncContext<Schema, B extends Partial<Record<keyof Schema, Bo
});
}

/**
* Sync a table that only gets additions, no updates (or update tracking)
*/
async forwardOnly<T extends keyof Schema & string>(table: T, opts?: {
pk?: AnyColumn<Schema, T>,
rowMapper?: RowMapper,
}) {
return this.table(table, {
timestampColumn: this.createdAtColumn as AnyColumn<Schema, T>,
...opts,
});
}

/**
* Sync a table that tracks its updates with an updated_at column
*/
async withUpdatedAt<T extends keyof Schema & string>(table: T, opts?: {
pk?: AnyColumn<Schema, T>,
timestampColumn?: AnyColumn<Schema, T>,
rowMapper?: RowMapper,
}) {
return this.table(table, {
timestampColumn: this.updatedAtColumn as AnyColumn<Schema, T>,
...opts,
});
const { pk, timestampColumn } = opts || {};
return syncTable(
this.db,
this.clickhouse,
(this.bookmark?.[table as keyof B] || {}) as Bookmark<string | number>,
{
...opts,
from: table,
to: table,
pk: (pk || this.getDefaultPrimaryKeyColumn(table)) as AnyColumn<Schema, T>,
optimize: true,
timestampColumn: timestampColumn || this.updatedAtColumn as AnyColumn<Schema, T>,
})
.then((result) => {
this.log('info', 'Sync complete', { table, rows: result.rows });
const newBookmark = { ...result.bookmark, lastCount: result.rows };
this.results[table as string] = newBookmark;
return {
table,
bookmark: newBookmark,
};
})
.catch((error) => {
this.log('error', 'Failed to sync table', { table, error });
throw error;
});
}
}
5 changes: 0 additions & 5 deletions src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,6 @@ export interface Bookmark<PK extends string | number> {
lastCount?: number;
}

export interface SyncResult<T extends string, PK extends string | number> {
table: T;
bookmark: Bookmark<PK>;
}

export type RowFetchFunction<T, PK extends string | number> = (bookmark: Bookmark<PK>, limit: number) => AsyncIterableIterator<T>;

export type RowMapper = (row: SourceDatabaseRowRecord) => ClickhouseRowRecord;
Expand Down

0 comments on commit 5e60e99

Please sign in to comment.