Skip to content

Commit

Permalink
fix(sync): add some more helpers to the sync context to simplify usage
Browse files Browse the repository at this point in the history
  • Loading branch information
djMax committed Jan 14, 2024
1 parent efc951b commit d2e603f
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 6 deletions.
20 changes: 15 additions & 5 deletions src/dbs/kysely.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { type Kysely, type AnyColumn, sql } from 'kysely';
import type { ClickHouseClient } from '@clickhouse/client';

import type { Bookmark, ClickhouseRowRecord, RowMapper, SourceDatabaseRowRecord } from '../types';
import type { Bookmark, ClickhouseRowRecord, RowMapper, SourceDatabaseRowRecord, SyncResult } from '../types';
import { synchronizeTable } from '../stream-copy';

type HasUpdatedAt<ColName extends string> = {
Expand Down Expand Up @@ -156,7 +156,7 @@ export async function copyTable<
/**
* Synchronize multiple tables with simpler syntax.
*/
export class KyselySyncContext<Schema, B extends Record<string, Bookmark<string> | Bookmark<number>>> {
export class KyselySyncContext<Schema, B extends Partial<Record<keyof Schema, Bookmark<string> | Bookmark<number>>>> {
// Log function that can be overridden
log: (level: 'info' | 'error', message: string, meta?: Record<string, unknown>) => void = () => { };
// The default name of the created_at column
Expand All @@ -166,6 +166,11 @@ export class KyselySyncContext<Schema, B extends Record<string, Bookmark<string>
// The default name of the primary key column based on the table name (the default strips a plural 's' from the end and adds _id)
getDefaultPrimaryKeyColumn = (table: string) => `${table.slice(0, -1)}_id`;

// This will be filled out with the results of the syncs in such a way that it can be used
// as a collective bookmark for the next sync. You can also get the individual bookmarks
// from the return value of the individual functions.
results = {} as Record<string, Bookmark<string | number>>;

constructor(
private readonly db: Kysely<Schema>,
private readonly clickhouse: ClickHouseClient,
Expand All @@ -175,7 +180,7 @@ export class KyselySyncContext<Schema, B extends Record<string, Bookmark<string>
async table<T extends keyof Schema & string>(table: T, opts: {
pk?: AnyColumn<Schema, T>,
timestampColumn?: AnyColumn<Schema, T>,
}) {
}): Promise<SyncResult<T, string | number>> {
const { pk, timestampColumn } = opts;
return syncTable(
this.db,
Expand All @@ -190,7 +195,12 @@ export class KyselySyncContext<Schema, B extends Record<string, Bookmark<string>
})
.then((result) => {
this.log('info', 'Sync complete', { table, rows: result.rows });
return [table, { ...result.bookmark, lastCount: result.rows }];
const newBookmark = { ...result.bookmark, lastCount: result.rows };
this.results[table as string] = newBookmark;
return {
table,
bookmark: newBookmark,
};
})
.catch((error) => {
this.log('error', 'Failed to copy table', { table, error });
Expand Down Expand Up @@ -223,4 +233,4 @@ export class KyselySyncContext<Schema, B extends Record<string, Bookmark<string>
...opts,
});
}
}
}
8 changes: 7 additions & 1 deletion src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ export type ClickhouseRowRecord = Record<string, unknown>;
export interface Bookmark<PK extends string | number> {
rowId?: PK;
rowTimestamp?: Date | null;
lastCount?: number;
}

export interface SyncResult<T extends string, PK extends string | number> {
table: T;
bookmark: Bookmark<PK>;
}

export type RowFetchFunction<T, PK extends string | number> = (bookmark: Bookmark<PK>, limit: number) => AsyncIterableIterator<T>;
Expand All @@ -20,7 +26,7 @@ interface BaseTableSyncSpec<T extends SourceDatabaseRowRecord, PK extends string
// returning a maximum of `limit` rows. If you return limit rows, the caller will assume
// there may be more, and call you again with a new bookmark
getRows: RowFetchFunction<T, PK>;
getBookmark(row: T): Bookmark<PK>;
getBookmark(row: T): Omit<Bookmark<PK>, 'lastCount'>;
// Defaults to 10,000 but if you want precise control over the select size, you can set it here
pageSize?: number;
// I would prefer this was row: T, but it just complicates the type system too much
Expand Down

0 comments on commit d2e603f

Please sign in to comment.